[VVP] updating validation scripts in dublin
[vvp/validation-scripts.git] / ice_validator / tests / utils / ports.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
39 #
40
41 from .network_roles import get_network_role_from_port
42 from .vm_types import get_vm_type_for_nova_server
43 import re
44
45
46 def is_valid_ip_address(
47     ip_address, vm_type, network_role, port_property, parameter_type
48 ):
49     """
50     Check the ip_address to make sure it is properly formatted and
51     also contains {vm_type} and {network_role}
52     """
53
54     allowed_formats = [
55         [
56             "allowed_address_pairs",
57             "string",
58             "internal",
59             re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
60         ],
61         [
62             "allowed_address_pairs",
63             "string",
64             "internal",
65             re.compile(r"(.+?)_int_(.+?)_floating_ip"),
66         ],
67         [
68             "allowed_address_pairs",
69             "string",
70             "external",
71             re.compile(r"(.+?)_floating_v6_ip"),
72         ],
73         [
74             "allowed_address_pairs",
75             "string",
76             "external",
77             re.compile(r"(.+?)_floating_ip"),
78         ],
79         [
80             "allowed_address_pairs",
81             "string",
82             "internal",
83             re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
84         ],
85         [
86             "allowed_address_pairs",
87             "string",
88             "internal",
89             re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
90         ],
91         ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
92         ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
93         [
94             "allowed_address_pairs",
95             "comma_delimited_list",
96             "internal",
97             re.compile(r"(.+?)_int_(.+?)_v6_ips"),
98         ],
99         [
100             "allowed_address_pairs",
101             "comma_delimited_list",
102             "internal",
103             re.compile(r"(.+?)_int_(.+?)_ips"),
104         ],
105         [
106             "allowed_address_pairs",
107             "comma_delimited_list",
108             "external",
109             re.compile(r"(.+?)_v6_ips"),
110         ],
111         [
112             "allowed_address_pairs",
113             "comma_delimited_list",
114             "external",
115             re.compile(r"(.+?)_ips"),
116         ],
117         ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
118         ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
119         ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
120         ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
121         [
122             "fixed_ips",
123             "comma_delimited_list",
124             "internal",
125             re.compile(r"(.+?)_int_(.+?)_v6_ips"),
126         ],
127         [
128             "fixed_ips",
129             "comma_delimited_list",
130             "internal",
131             re.compile(r"(.+?)_int_(.+?)_ips"),
132         ],
133         ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
134         ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
135     ]
136
137     for v3 in allowed_formats:
138         if v3[1] != parameter_type:
139             continue
140         if v3[0] != port_property:
141             continue
142         # check if pattern matches
143         m = v3[3].match(ip_address)
144         if m:
145             if v3[2] == "internal" and len(m.groups()) > 1:
146                 return m.group(1) == vm_type and m.group(2) == network_role
147             elif v3[2] == "external" and len(m.groups()) > 0:
148                 return m.group(1) == vm_type + "_" + network_role
149
150     return False
151
152
153 def get_invalid_ip_addresses(resources, port_property, parameters):
154     """
155     Get a list of valid ip addresses for a heat resources section
156     """
157     invalid_ip_addresses = []
158
159     for k, v in resources.items():
160         if not isinstance(v, dict):
161             continue
162         if "type" not in v:
163             continue
164         if v["type"] not in "OS::Nova::Server":
165             continue
166         if "properties" not in v:
167             continue
168         if "networks" not in v["properties"]:
169             continue
170
171         port_resource = None
172
173         vm_type = get_vm_type_for_nova_server(v)
174         if not vm_type:
175             continue
176
177         # get all ports associated with the nova server
178         properties = v["properties"]
179         for network in properties["networks"]:
180             for k3, v3 in network.items():
181                 if k3 != "port":
182                     continue
183                 if not isinstance(v3, dict):
184                     continue
185
186                 if "get_resource" in v3:
187                     port_id = v3["get_resource"]
188                     if not resources[port_id]:
189                         continue
190                     port_resource = resources[port_id]
191                 else:
192                     continue
193
194                 network_role = get_network_role_from_port(port_resource)
195                 if not network_role:
196                     continue
197
198                 for k1, v1 in port_resource["properties"].items():
199                     if k1 != port_property:
200                         continue
201                     for v2 in v1:
202                         if "ip_address" not in v2:
203                             continue
204                         if "get_param" not in v2["ip_address"]:
205                             continue
206                         ip_address = v2["ip_address"]["get_param"]
207
208                         if isinstance(ip_address, list):
209                             ip_address = ip_address[0]
210
211                         if ip_address not in parameters:
212                             continue
213
214                         parameter_type = parameters[ip_address].get("type")
215                         if not parameter_type:
216                             continue
217
218                         valid_ip_address = is_valid_ip_address(
219                             ip_address,
220                             vm_type,
221                             network_role,
222                             port_property,
223                             parameter_type,
224                         )
225
226                         if not valid_ip_address:
227                             invalid_ip_addresses.append(ip_address)
228
229     return invalid_ip_addresses
230
231
232 def get_list_of_ports_attached_to_nova_server(nova_server):
233     networks_list = nova_server.get("properties", {}).get("networks")
234
235     port_ids = []
236     if networks_list:
237         for network in networks_list:
238             network_prop = network.get("port")
239             if network_prop:
240                 pid = network_prop.get("get_param")
241                 if not pid:
242                     pid = network_prop.get("get_resource")
243                 port_ids.append(pid)
244
245     return port_ids