Add option to use services with ipv6
[dcaegen2/platform/plugins.git] / k8s / tests / test_k8sclient.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2019 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19
20 # Basic sanity tests for  k8sclient functions
21
22 import pytest
23
24 def test_parse_interval():
25     from k8sclient.k8sclient import _parse_interval
26
27     good_intervals = [{"in": input, "ex": expected}
28         for (input, expected) in [
29             (30, 30),
30             ("30", 30),
31             ("30s", 30),
32             ("2m", 2 * 60),
33             ("2h", 2 * 60 * 60),
34             ("24h", 24 * 60 * 60),
35             (354123, 354123),
36             ("354123", 354123),
37             ("354123s", 354123),
38             (1234567890123456789012345678901234567890,1234567890123456789012345678901234567890),
39             ("1234567890123456789012345678901234567890",1234567890123456789012345678901234567890),
40             ("1234567890123456789012345678901234567890s",1234567890123456789012345678901234567890),
41             ("05s", 5),
42             ("00000000000000000000000000000000005m", 5 * 60)
43         ]
44     ]
45
46     bad_intervals = [
47         -99,
48         "-99",
49         "-99s",
50         "-99m",
51         "-99h",
52         "30d",
53         "30w",
54         "30y",
55         "3 0s",
56         "3 5m",
57         30.0,
58         "30.0s",
59         "30.0m",
60         "30.0h",
61         "a 30s",
62         "30s a",
63         "a 30s a",
64         "a 30",
65         "30 a",
66         "a 30 a",
67         "i want an interval of 30s",
68         "thirty seconds",
69         "30 s",
70         "30 m",
71         "30 h",
72         10E0,
73         "10E0",
74         3.14159,
75         "3.14159s"
76         "3:05",
77         "3m05s",
78         "3seconds",
79         "3S",
80         "1minute",
81         "1stanbul"
82     ]
83
84     for test_case in good_intervals:
85         assert _parse_interval(test_case["in"]) == test_case["ex"]
86
87     for interval in bad_intervals:
88         with pytest.raises(ValueError):
89             _parse_interval(interval)
90
91 def test_parse_ports():
92     from k8sclient.k8sclient import parse_ports
93
94     good_ports = [{"in": input, "ex": expected}
95         for (input, expected) in [
96             ({u'concat': [u'9101', u':', 0], u'ipv6': True}, (9101, 0, True, "TCP")),
97             ({u'concat': [u'9102', u':', 0], u'ipv6': False}, (9102, 0, False, "TCP")),
98             ({u'concat': [u'9103', u':', 36000], u'ipv6': True}, (9103, 36000, True, "TCP")),
99             ("9101/TCP:0", (9101, 0, False, "TCP")),
100             ("9101/tcp:0", (9101, 0, False, "TCP")),
101             ("9101/UDP:0", (9101, 0, False, "UDP")),
102             ("9101/udp:0", (9101, 0, False, "UDP")),
103             ("9101:31043", (9101, 31043, False, "TCP")),
104             ("9101/TCP:31043", (9101, 31043, False, "TCP")),
105             ("9101/tcp:31043", (9101, 31043, False, "TCP")),
106             ("9101/UDP:31043", (9101, 31043, False, "UDP")),
107             ("9101/udp:31043", (9101, 31043, False, "UDP"))
108         ]
109     ]
110
111     bad_ports = [
112         "9101",
113         "9101:",
114         "9101:0x453",
115         "9101:0/udp",
116         "9101/:0",
117         "9101/u:0",
118         "9101/http:404",
119         "9101:-1"
120     ]
121
122     port_list = [
123         {u'concat': [u'9101', u':', 3023], u'ipv6': True},
124         {u'concat': [u'9102', u':', 0], u'ipv6': True},
125         {u'concat': [u'9103', u':', 0], u'ipv6': False},
126         "5053/tcp:5053",
127         "5053/udp:5053",
128         "9661:19661",
129         "9661/udp:19661",
130         "8080/tcp:8080"
131     ]
132
133     expected_port_map = {
134         (9101, "TCP", True): 3023,
135         (9102, "TCP", True): 0,
136         (9103, "TCP", False): 0,
137         (5053, "TCP", False): 5053,
138         (5053, "UDP", False): 5053,
139         (9661, "TCP", False): 19661,
140         (9661, "UDP", False): 19661,
141         (8080, "TCP", False): 8080
142     }
143
144     for test_case in good_ports:
145         container_ports, port_map = parse_ports([test_case["in"]])
146         (cport, hport, ipv6, proto) = test_case["ex"]
147         assert container_ports == [(cport, proto)]
148         assert port_map == {(cport, proto, ipv6): hport}
149
150     for port in bad_ports:
151         with pytest.raises(ValueError):
152             parse_ports([port])
153
154     container_ports, port_map = parse_ports(port_list)
155     assert port_map == expected_port_map
156
157 def test_create_container():
158     from k8sclient.k8sclient import _create_container_object
159     from kubernetes import client
160
161     container = _create_container_object("c1","nginx",False, container_ports=[(80, "TCP"), (53, "UDP")])
162
163     assert container.ports[0].container_port == 80 and container.ports[0].protocol == "TCP"
164     assert container.ports[1].container_port == 53 and container.ports[1].protocol == "UDP"
165
166 def test_create_probe():
167     from k8sclient.k8sclient import _create_probe
168     from kubernetes import client
169
170     http_checks = [
171         {"type" : "http", "endpoint" : "/example/health"}
172     ]
173
174     script_checks = [
175         {"type" : "docker", "script": "/opt/app/health_check.sh"}
176     ]
177
178     for hc in http_checks:
179         probe = _create_probe(hc, 13131)
180         assert probe.http_get.path == hc["endpoint"]
181         assert probe.http_get.scheme == hc["type"].upper()
182
183     for hc in script_checks:
184         probe = _create_probe(hc, 13131)
185         assert probe._exec.command[0] == hc["script"]