1+ # -*- coding: utf-8 -*-
2+
3+ # Licensed under the Apache License, Version 2.0 (the "License"); you may
4+ # not use this file except in compliance with the License. You may obtain
5+ # a copy of the License at
6+ #
7+ # http://www.apache.org/licenses/LICENSE-2.0
8+ #
9+ # Unless required by applicable law or agreed to in writing, software
10+ # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+ # License for the specific language governing permissions and limitations
13+ # under the License.
14+
15+ """
16+ test_k8sclient
17+ ----------------------------------
18+
19+ Tests for `k8sclient` module. Deploy Kubernetes using:
20+ http://kubernetes.io/docs/getting-started-guides/docker/
21+
22+ and then run this test
23+ """
24+
25+ import unittest
26+ import urllib3
27+ import uuid
28+
29+ from kubernetes .client import api_client
30+ from kubernetes .client .apis import core_v1_api
31+
32+
33+ def _is_k8s_running ():
34+ try :
35+ urllib3 .PoolManager ().request ('GET' , '127.0.0.1:8080' )
36+ return True
37+ except urllib3 .exceptions .HTTPError :
38+ return False
39+
40+
41+ class TestK8sclient (unittest .TestCase ):
42+ @unittest .skipUnless (
43+ _is_k8s_running (), "Kubernetes is not available" )
44+ def test_list_endpoints (self ):
45+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
46+ api = core_v1_api .CoreV1Api (client )
47+
48+ endpoints = api .list_endpoints_for_all_namespaces ()
49+ self .assertTrue (len (endpoints .items ) > 0 )
50+
51+ @unittest .skipUnless (
52+ _is_k8s_running (), "Kubernetes is not available" )
53+ def test_pod_apis (self ):
54+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
55+ api = core_v1_api .CoreV1Api (client )
56+
57+ name = 'test-' + str (uuid .uuid4 ())
58+
59+ pod_manifest = {'apiVersion' : 'v1' ,
60+ 'kind' : 'Pod' ,
61+ 'metadata' : {'color' : 'blue' , 'name' : name },
62+ 'spec' : {'containers' : [{'image' : 'dockerfile/redis' ,
63+ 'name' : 'redis' }]}}
64+
65+ resp = api .create_namespaced_pod (body = pod_manifest ,
66+ namespace = 'default' )
67+ self .assertEqual (name , resp .metadata .name )
68+ self .assertTrue (resp .status .phase )
69+
70+ resp = api .read_namespaced_pod (name = name ,
71+ namespace = 'default' )
72+ self .assertEqual (name , resp .metadata .name )
73+ self .assertTrue (resp .status .phase )
74+
75+ number_of_pods = len (api .list_pod_for_all_namespaces ().items )
76+ self .assertTrue (number_of_pods > 0 )
77+
78+ resp = api .delete_namespaced_pod (name = name , body = {},
79+ namespace = 'default' )
80+
81+ @unittest .skipUnless (
82+ _is_k8s_running (), "Kubernetes is not available" )
83+ def test_service_apis (self ):
84+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
85+ api = core_v1_api .CoreV1Api (client )
86+
87+ service_manifest = {'apiVersion' : 'v1' ,
88+ 'kind' : 'Service' ,
89+ 'metadata' : {'labels' : {'name' : 'frontend' },
90+ 'name' : 'frontend' ,
91+ 'resourceversion' : 'v1' },
92+ 'spec' : {'ports' : [{'name' : 'port' ,
93+ 'port' : 80 ,
94+ 'protocol' : 'TCP' ,
95+ 'targetPort' : 80 }],
96+ 'selector' : {'name' : 'frontend' }}}
97+
98+ resp = api .create_namespaced_service (body = service_manifest ,
99+ namespace = 'default' )
100+ self .assertEqual ('frontend' , resp .metadata .name )
101+ self .assertTrue (resp .status )
102+
103+ resp = api .read_namespaced_service (name = 'frontend' ,
104+ namespace = 'default' )
105+ self .assertEqual ('frontend' , resp .metadata .name )
106+ self .assertTrue (resp .status )
107+
108+ # TODO(dims) : Fails with "json: cannot unmarshal object into
109+ # Go value of type jsonpatch.Patch"
110+ # service_manifest['spec']['ports'] = [{'name': 'new',
111+ # 'port': 8080,
112+ # 'protocol': 'TCP',
113+ # 'targetPort': 8080}]
114+ # resp = api.patch_namespaced_service(body=service_manifest,
115+ # name='frontend',
116+ # namespace='default')
117+ # self.assertEqual(2, len(resp.spec.ports))
118+ # self.assertTrue(resp.status)
119+
120+ resp = api .delete_namespaced_service (name = 'frontend' ,
121+ namespace = 'default' )
122+
123+ @unittest .skipUnless (
124+ _is_k8s_running (), "Kubernetes is not available" )
125+ def test_replication_controller_apis (self ):
126+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
127+ api = core_v1_api .CoreV1Api (client )
128+
129+ rc_manifest = {
130+ 'apiVersion' : 'v1' ,
131+ 'kind' : 'ReplicationController' ,
132+ 'metadata' : {'labels' : {'name' : 'frontend' },
133+ 'name' : 'frontend' },
134+ 'spec' : {'replicas' : 2 ,
135+ 'selector' : {'name' : 'frontend' },
136+ 'template' : {'metadata' : {
137+ 'labels' : {'name' : 'frontend' }},
138+ 'spec' : {'containers' : [{
139+ 'image' : 'nginx' ,
140+ 'name' : 'nginx' ,
141+ 'ports' : [{'containerPort' : 80 ,
142+ 'protocol' : 'TCP' }]}]}}}}
143+
144+ resp = api .create_namespaced_replication_controller (
145+ body = rc_manifest , namespace = 'default' )
146+ self .assertEqual ('frontend' , resp .metadata .name )
147+ self .assertEqual (2 , resp .spec .replicas )
148+
149+ resp = api .read_namespaced_replication_controller (
150+ name = 'frontend' , namespace = 'default' )
151+ self .assertEqual ('frontend' , resp .metadata .name )
152+ self .assertEqual (2 , resp .spec .replicas )
153+
154+ resp = api .delete_namespaced_replication_controller (
155+ name = 'frontend' , body = {}, namespace = 'default' )
156+
157+
158+ @unittest .skipUnless (
159+ _is_k8s_running (), "Kubernetes is not available" )
160+ def test_configmap_apis (self ):
161+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
162+ api = core_v1_api .CoreV1Api (client )
163+
164+ test_configmap = {
165+ "kind" : "ConfigMap" ,
166+ "apiVersion" : "v1" ,
167+ "metadata" : {
168+ "name" : "test-configmap" ,
169+ },
170+ "data" : {
171+ "config.json" : "{\" command\" :\" /usr/bin/mysqld_safe\" }" ,
172+ "frontend.cnf" : "[mysqld]\n bind-address = 10.0.0.3\n port = 3306\n "
173+ }
174+ }
175+
176+ resp = api .create_namespaced_config_map (
177+ body = test_configmap , namespace = 'default'
178+ )
179+ self .assertEqual ('test-configmap' , resp .metadata .name )
180+
181+ resp = api .read_namespaced_config_map (
182+ name = 'test-configmap' , namespace = 'default' )
183+ self .assertEqual ('test-configmap' , resp .metadata .name )
184+
185+ # TODO(dims): Fails with "json: cannot unmarshal object
186+ # into Go value of type jsonpatch.Patch"
187+ # test_configmap['data']['config.json'] = "{}"
188+ # resp = api.patch_namespaced_config_map(
189+ # name='test-configmap', namespace='default', body=test_configmap)
190+
191+ resp = api .delete_namespaced_config_map (
192+ name = 'test-configmap' , body = {}, namespace = 'default' )
193+
194+
195+ @unittest .skipUnless (
196+ _is_k8s_running (), "Kubernetes is not available" )
197+ def test_node_apis (self ):
198+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
199+ api = core_v1_api .CoreV1Api (client )
200+
201+ for item in api .list_node ().items :
202+ node = api .read_node (name = item .metadata .name )
203+ self .assertTrue (len (node .metadata .labels ) > 0 )
204+ self .assertTrue (isinstance (node .metadata .labels , dict ))
0 commit comments