1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2019 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
20 # Basic sanity tests for k8sclient functions
24 def test_parse_interval():
25 from k8sclient.k8sclient import _parse_interval
27 good_intervals = [{"in": input, "ex": expected}
28 for (input, expected) in [
34 ("24h", 24 * 60 * 60),
38 (1234567890123456789012345678901234567890,1234567890123456789012345678901234567890),
39 ("1234567890123456789012345678901234567890",1234567890123456789012345678901234567890),
40 ("1234567890123456789012345678901234567890s",1234567890123456789012345678901234567890),
42 ("00000000000000000000000000000000005m", 5 * 60)
67 "i want an interval of 30s",
84 for test_case in good_intervals:
85 assert _parse_interval(test_case["in"]) == test_case["ex"]
87 for interval in bad_intervals:
88 with pytest.raises(ValueError):
89 _parse_interval(interval)
91 def test_parse_ports():
92 from k8sclient.k8sclient import parse_ports
94 good_ports = [{"in": input, "ex": expected}
95 for (input, expected) in [
96 ("9101:0", (9101, 0, "TCP")),
97 ("9101/TCP:0", (9101, 0, "TCP")),
98 ("9101/tcp:0", (9101, 0, "TCP")),
99 ("9101/UDP:0", (9101, 0, "UDP")),
100 ("9101/udp:0", (9101, 0, "UDP")),
101 ("9101:31043", (9101, 31043, "TCP")),
102 ("9101/TCP:31043", (9101, 31043, "TCP")),
103 ("9101/tcp:31043", (9101, 31043, "TCP")),
104 ("9101/UDP:31043", (9101, 31043, "UDP")),
105 ("9101/udp:31043", (9101, 31043, "UDP"))
129 expected_port_map = {
133 (9661,"TCP") : 19661,
134 (9661,"UDP") : 19661,
138 for test_case in good_ports:
139 container_ports, port_map = parse_ports([test_case["in"]])
140 (cport, hport, proto) = test_case["ex"]
141 assert container_ports == [(cport, proto)]
142 assert port_map == {(cport, proto) : hport}
144 for port in bad_ports:
145 with pytest.raises(ValueError):
148 container_ports, port_map = parse_ports(port_list)
149 assert port_map == expected_port_map
151 def test_create_container():
152 from k8sclient.k8sclient import _create_container_object
153 from kubernetes import client
155 container = _create_container_object("c1","nginx",False, container_ports=[(80, "TCP"), (53, "UDP")])
157 assert container.ports[0].container_port == 80 and container.ports[0].protocol == "TCP"
158 assert container.ports[1].container_port == 53 and container.ports[1].protocol == "UDP"
160 def test_create_probe():
161 from k8sclient.k8sclient import _create_probe
162 from kubernetes import client
165 {"type" : "http", "endpoint" : "/example/health"}
169 {"type" : "docker", "script": "/opt/app/health_check.sh"}
172 for hc in http_checks:
173 probe = _create_probe(hc, 13131)
174 assert probe.http_get.path == hc["endpoint"]
175 assert probe.http_get.scheme == hc["type"].upper()
177 for hc in script_checks:
178 probe = _create_probe(hc, 13131)
179 assert probe._exec.command[0] == hc["script"]