[VVP] ports.py check port type
[vvp/validation-scripts.git] / ice_validator / tests / utils / ports.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 #
39
40 from .network_roles import get_network_role_and_type
41 from .vm_types import get_vm_type_for_nova_server
42 import re
43
44
45 def is_valid_ip_address(
46     ip_address, vm_type, network_role, port_property, parameter_type, network_type
47 ):
48     """
49     Check the ip_address to make sure it is properly formatted and
50     also contains {vm_type} and {network_role}
51     """
52
53     allowed_formats = [
54         [
55             "allowed_address_pairs",
56             "string",
57             "internal",
58             re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
59         ],
60         [
61             "allowed_address_pairs",
62             "string",
63             "internal",
64             re.compile(r"(.+?)_int_(.+?)_floating_ip"),
65         ],
66         [
67             "allowed_address_pairs",
68             "string",
69             "external",
70             re.compile(r"(.+?)_floating_v6_ip"),
71         ],
72         [
73             "allowed_address_pairs",
74             "string",
75             "external",
76             re.compile(r"(.+?)_floating_ip"),
77         ],
78         [
79             "allowed_address_pairs",
80             "string",
81             "internal",
82             re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
83         ],
84         [
85             "allowed_address_pairs",
86             "string",
87             "internal",
88             re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
89         ],
90         ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
91         ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
92         [
93             "allowed_address_pairs",
94             "comma_delimited_list",
95             "internal",
96             re.compile(r"(.+?)_int_(.+?)_v6_ips"),
97         ],
98         [
99             "allowed_address_pairs",
100             "comma_delimited_list",
101             "internal",
102             re.compile(r"(.+?)_int_(.+?)_ips"),
103         ],
104         [
105             "allowed_address_pairs",
106             "comma_delimited_list",
107             "external",
108             re.compile(r"(.+?)_v6_ips"),
109         ],
110         [
111             "allowed_address_pairs",
112             "comma_delimited_list",
113             "external",
114             re.compile(r"(.+?)_ips"),
115         ],
116         ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
117         ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
118         ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
119         ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
120         [
121             "fixed_ips",
122             "comma_delimited_list",
123             "internal",
124             re.compile(r"(.+?)_int_(.+?)_v6_ips"),
125         ],
126         [
127             "fixed_ips",
128             "comma_delimited_list",
129             "internal",
130             re.compile(r"(.+?)_int_(.+?)_ips"),
131         ],
132         ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
133         ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
134     ]
135
136     for v3 in allowed_formats:
137         if v3[1] != parameter_type:
138             continue
139         if v3[0] != port_property:
140             continue
141         if v3[2] != network_type:
142             continue
143         # check if pattern matches
144         m = v3[3].match(ip_address)
145         if m:
146             if v3[2] == "internal" and len(m.groups()) > 1:
147                 return m.group(1) == vm_type and m.group(2) == network_role
148             elif v3[2] == "external" and len(m.groups()) > 0:
149                 return m.group(1) == vm_type + "_" + network_role
150
151     return False
152
153
154 def get_invalid_ip_addresses(resources, port_property, parameters):
155     """
156     Get a list of valid ip addresses for a heat resources section
157     """
158     invalid_ip_addresses = []
159
160     for k, v in resources.items():
161         if not isinstance(v, dict):
162             continue
163         if "type" not in v:
164             continue
165         if v["type"] not in "OS::Nova::Server":
166             continue
167         if "properties" not in v:
168             continue
169         if "networks" not in v["properties"]:
170             continue
171
172         port_resource = None
173
174         vm_type = get_vm_type_for_nova_server(v)
175         if not vm_type:
176             continue
177
178         # get all ports associated with the nova server
179         properties = v["properties"]
180         for network in properties["networks"]:
181             for k3, v3 in network.items():
182                 if k3 != "port":
183                     continue
184                 if not isinstance(v3, dict):
185                     continue
186
187                 if "get_resource" in v3:
188                     port_id = v3["get_resource"]
189                     if not resources[port_id]:
190                         continue
191                     port_resource = resources[port_id]
192                 else:
193                     continue
194
195                 network_role, network_type = get_network_role_and_type(port_resource)
196                 if not network_role or not network_type:
197                     continue
198
199                 for k1, v1 in port_resource["properties"].items():
200                     if k1 != port_property:
201                         continue
202                     for v2 in v1:
203                         if "ip_address" not in v2:
204                             continue
205                         if "get_param" not in v2["ip_address"]:
206                             continue
207                         ip_address = v2["ip_address"]["get_param"]
208
209                         if isinstance(ip_address, list):
210                             ip_address = ip_address[0]
211
212                         if ip_address not in parameters:
213                             continue
214
215                         parameter_type = parameters[ip_address].get("type")
216                         if not parameter_type:
217                             continue
218
219                         valid_ip_address = is_valid_ip_address(
220                             ip_address,
221                             vm_type,
222                             network_role,
223                             port_property,
224                             parameter_type,
225                             network_type,
226                         )
227
228                         if not valid_ip_address:
229                             invalid_ip_addresses.append(ip_address)
230
231     return invalid_ip_addresses
232
233
234 def get_list_of_ports_attached_to_nova_server(nova_server):
235     networks_list = nova_server.get("properties", {}).get("networks")
236
237     port_ids = []
238     if networks_list:
239         for network in networks_list:
240             network_prop = network.get("port")
241             if network_prop:
242                 pid = network_prop.get("get_param")
243                 if not pid:
244                     pid = network_prop.get("get_resource")
245                 port_ids.append(pid)
246
247     return port_ids