[VVP] Added new three new reports
[vvp/validation-scripts.git] / ice_validator / tests / utils / ports.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
39 #
40
41 from .network_roles import get_network_role_from_port
42 from .vm_types import get_vm_type_for_nova_server
43 import re
44
45
46 def is_valid_ip_address(ip_address, vm_type, network_role, port_property):
47     """
48     Check the ip_address to make sure it is properly formatted and
49     also contains {vm_type} and {network_role}
50     """
51
52     allowed_formats = [
53         [
54             "allowed_address_pairs",
55             "string",
56             "internal",
57             re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
58         ],
59         [
60             "allowed_address_pairs",
61             "string",
62             "internal",
63             re.compile(r"(.+?)_int_(.+?)_floating_ip"),
64         ],
65         [
66             "allowed_address_pairs",
67             "string",
68             "external",
69             re.compile(r"(.+?)_floating_v6_ip"),
70         ],
71         [
72             "allowed_address_pairs",
73             "string",
74             "external",
75             re.compile(r"(.+?)_floating_ip"),
76         ],
77         [
78             "allowed_address_pairs",
79             "string",
80             "internal",
81             re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
82         ],
83         [
84             "allowed_address_pairs",
85             "string",
86             "internal",
87             re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
88         ],
89         ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
90         ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
91         [
92             "allowed_address_pairs",
93             "comma_delimited_list",
94             "internal",
95             re.compile(r"(.+?)_int_(.+?)_v6_ips"),
96         ],
97         [
98             "allowed_address_pairs",
99             "comma_delimited_list",
100             "internal",
101             re.compile(r"(.+?)_int_(.+?)_ips"),
102         ],
103         [
104             "allowed_address_pairs",
105             "comma_delimited_list",
106             "external",
107             re.compile(r"(.+?)_v6_ips"),
108         ],
109         [
110             "allowed_address_pairs",
111             "comma_delimited_list",
112             "external",
113             re.compile(r"(.+?)_ips"),
114         ],
115         ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
116         ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
117         ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
118         ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
119         [
120             "fixed_ips",
121             "comma_delimited_list",
122             "internal",
123             re.compile(r"(.+?)_int_(.+?)_v6_ips"),
124         ],
125         [
126             "fixed_ips",
127             "comma_delimited_list",
128             "internal",
129             re.compile(r"(.+?)_int_(.+?)_ips"),
130         ],
131         ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
132         ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
133     ]
134
135     for v3 in allowed_formats:
136         if v3[0] != port_property:
137             continue
138         # check if pattern matches
139         m = v3[3].match(ip_address)
140         if m:
141             if v3[2] == "internal" and len(m.groups()) > 1:
142                 return m.group(1) == vm_type and m.group(2) == network_role
143             elif v3[2] == "external" and len(m.groups()) > 0:
144                 return m.group(1) == vm_type + "_" + network_role
145
146     return False
147
148
149 def get_invalid_ip_addresses(resources, port_property):
150     """
151     Get a list of valid ip addresses for a heat resources section
152     """
153     invalid_ip_addresses = []
154
155     for k, v in resources.items():
156         if not isinstance(v, dict):
157             continue
158         if "type" not in v:
159             continue
160         if v["type"] not in "OS::Nova::Server":
161             continue
162         if "properties" not in v:
163             continue
164         if "networks" not in v["properties"]:
165             continue
166
167         port_resource = None
168
169         vm_type = get_vm_type_for_nova_server(v)
170         if not vm_type:
171             continue
172
173         # get all ports associated with the nova server
174         properties = v["properties"]
175         for network in properties["networks"]:
176             for k3, v3 in network.items():
177                 if k3 != "port":
178                     continue
179                 if not isinstance(v3, dict):
180                     continue
181
182                 if "get_resource" in v3:
183                     port_id = v3["get_resource"]
184                     if not resources[port_id]:
185                         continue
186                     port_resource = resources[port_id]
187                 else:
188                     continue
189
190                 network_role = get_network_role_from_port(port_resource)
191                 if not network_role:
192                     continue
193
194                 for k1, v1 in port_resource["properties"].items():
195                     if k1 != port_property:
196                         continue
197                     for v2 in v1:
198                         if "ip_address" not in v2:
199                             continue
200                         if "get_param" not in v2["ip_address"]:
201                             continue
202
203                         ip_address = v2["ip_address"]["get_param"]
204
205                         if isinstance(ip_address, list):
206                             ip_address = ip_address[0]
207
208                         valid_ip_address = is_valid_ip_address(
209                             ip_address, vm_type, network_role, port_property
210                         )
211
212                         if not valid_ip_address:
213                             invalid_ip_addresses.append(ip_address)
214
215     return invalid_ip_addresses
216
217
218 def is_reserved_port(port_id):
219     """
220     Checks to see if the resource id for a port follows
221     the reserve port concept
222     """
223     formats = [
224         ["port_id", re.compile(r"reserve_port_(.+?)_floating_ip_\d+")],
225         ["port_id", re.compile(r"reserve_port_(.+?)_floating_v6_ip_\d+")],
226     ]
227     for f in formats:
228         m = f[1].match(port_id.lower())
229         if m and m.group(1):
230             return True
231     return False