1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2019 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
20 # Basic sanity tests for k8sclient functions
24 def test_parse_interval():
25 from k8sclient.k8sclient import _parse_interval
27 good_intervals = [{"in": input, "ex": expected}
28 for (input, expected) in [
34 ("24h", 24 * 60 * 60),
38 (1234567890123456789012345678901234567890,1234567890123456789012345678901234567890),
39 ("1234567890123456789012345678901234567890",1234567890123456789012345678901234567890),
40 ("1234567890123456789012345678901234567890s",1234567890123456789012345678901234567890),
42 ("00000000000000000000000000000000005m", 5 * 60)
67 "i want an interval of 30s",
84 for test_case in good_intervals:
85 assert _parse_interval(test_case["in"]) == test_case["ex"]
87 for interval in bad_intervals:
88 with pytest.raises(ValueError):
89 _parse_interval(interval)
91 def test_parse_ports():
92 from k8sclient.k8sclient import parse_ports
94 good_ports = [{"in": input, "ex": expected}
95 for (input, expected) in [
96 ({u'concat': [u'9101', u':', 0], u'ipv6': True}, (9101, 0, True, "TCP")),
97 ({u'concat': [u'9102', u':', 0], u'ipv6': False}, (9102, 0, False, "TCP")),
98 ({u'concat': [u'9103', u':', 36000], u'ipv6': True}, (9103, 36000, True, "TCP")),
99 ("9101/TCP:0", (9101, 0, False, "TCP")),
100 ("9101/tcp:0", (9101, 0, False, "TCP")),
101 ("9101/UDP:0", (9101, 0, False, "UDP")),
102 ("9101/udp:0", (9101, 0, False, "UDP")),
103 ("9101:31043", (9101, 31043, False, "TCP")),
104 ("9101/TCP:31043", (9101, 31043, False, "TCP")),
105 ("9101/tcp:31043", (9101, 31043, False, "TCP")),
106 ("9101/UDP:31043", (9101, 31043, False, "UDP")),
107 ("9101/udp:31043", (9101, 31043, False, "UDP"))
123 {u'concat': [u'9101', u':', 3023], u'ipv6': True},
124 {u'concat': [u'9102', u':', 0], u'ipv6': True},
125 {u'concat': [u'9103', u':', 0], u'ipv6': False},
133 expected_port_map = {
134 (9101, "TCP", True): 3023,
135 (9102, "TCP", True): 0,
136 (9103, "TCP", False): 0,
137 (5053, "TCP", False): 5053,
138 (5053, "UDP", False): 5053,
139 (9661, "TCP", False): 19661,
140 (9661, "UDP", False): 19661,
141 (8080, "TCP", False): 8080
144 for test_case in good_ports:
145 container_ports, port_map = parse_ports([test_case["in"]])
146 (cport, hport, ipv6, proto) = test_case["ex"]
147 assert container_ports == [(cport, proto)]
148 assert port_map == {(cport, proto, ipv6): hport}
150 for port in bad_ports:
151 with pytest.raises(ValueError):
154 container_ports, port_map = parse_ports(port_list)
155 assert port_map == expected_port_map
157 def test_create_container():
158 from k8sclient.k8sclient import _create_container_object
159 from kubernetes import client
161 container = _create_container_object("c1","nginx",False, container_ports=[(80, "TCP"), (53, "UDP")])
163 assert container.ports[0].container_port == 80 and container.ports[0].protocol == "TCP"
164 assert container.ports[1].container_port == 53 and container.ports[1].protocol == "UDP"
166 def test_create_probe():
167 from k8sclient.k8sclient import _create_probe
168 from kubernetes import client
171 {"type" : "http", "endpoint" : "/example/health"}
175 {"type" : "docker", "script": "/opt/app/health_check.sh"}
178 for hc in http_checks:
179 probe = _create_probe(hc, 13131)
180 assert probe.http_get.path == hc["endpoint"]
181 assert probe.http_get.scheme == hc["type"].upper()
183 for hc in script_checks:
184 probe = _create_probe(hc, 13131)
185 assert probe._exec.command[0] == hc["script"]