a2ae8a94fea5c541e0efbb2622bbced8e4bc92b2
[vvp/validation-scripts.git] / ice_validator / tests / utils / ports.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
39 #
40
41 from .network_roles import get_network_role_from_port
42 from .vm_types import get_vm_type_for_nova_server
43 import re
44
45
46 def is_valid_ip_address(ip_address, vm_type, network_role, port_property, parameter_type):
47     '''
48     Check the ip_address to make sure it is properly formatted and
49     also contains {vm_type} and {network_role}
50     '''
51
52     allowed_formats = [
53                       ["allowed_address_pairs", "string", "internal",
54                        re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')],
55                       ["allowed_address_pairs", "string", "internal",
56                        re.compile(r'(.+?)_int_(.+?)_floating_ip')],
57                       ["allowed_address_pairs", "string", "external",
58                        re.compile(r'(.+?)_floating_v6_ip')],
59                       ["allowed_address_pairs", "string", "external",
60                        re.compile(r'(.+?)_floating_ip')],
61                       ["allowed_address_pairs", "string", "internal",
62                        re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
63                       ["allowed_address_pairs", "string", "internal",
64                        re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
65                       ["allowed_address_pairs", "string", "external",
66                        re.compile(r'(.+?)_v6_ip_\d+')],
67                       ["allowed_address_pairs", "string", "external",
68                        re.compile(r'(.+?)_ip_\d+')],
69                       ["allowed_address_pairs", "comma_delimited_list",
70                        "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')],
71                       ["allowed_address_pairs", "comma_delimited_list",
72                        "internal", re.compile(r'(.+?)_int_(.+?)_ips')],
73                       ["allowed_address_pairs", "comma_delimited_list",
74                        "external", re.compile(r'(.+?)_v6_ips')],
75                       ["allowed_address_pairs", "comma_delimited_list",
76                        "external", re.compile(r'(.+?)_ips')],
77                       ["fixed_ips", "string", "internal",
78                        re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
79                       ["fixed_ips", "string", "internal",
80                        re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
81                       ["fixed_ips", "string", "external",
82                        re.compile(r'(.+?)_v6_ip_\d+')],
83                       ["fixed_ips", "string", "external",
84                        re.compile(r'(.+?)_ip_\d+')],
85                       ["fixed_ips", "comma_delimited_list", "internal",
86                        re.compile(r'(.+?)_int_(.+?)_v6_ips')],
87                       ["fixed_ips", "comma_delimited_list", "internal",
88                        re.compile(r'(.+?)_int_(.+?)_ips')],
89                       ["fixed_ips", "comma_delimited_list", "external",
90                        re.compile(r'(.+?)_v6_ips')],
91                       ["fixed_ips", "comma_delimited_list", "external",
92                        re.compile(r'(.+?)_ips')]]
93
94     for v3 in allowed_formats:
95         if v3[1] != parameter_type:
96             continue
97         if v3[0] != port_property:
98             continue
99         # check if pattern matches
100         m = v3[3].match(ip_address)
101         if m:
102             if (v3[2] == "internal" and
103                     len(m.groups()) > 1):
104                     return m.group(1) == vm_type and\
105                         m.group(2) == network_role
106             elif (v3[2] == "external" and
107                   len(m.groups()) > 0):
108                 return m.group(1) == vm_type + "_" + network_role
109
110     return False
111
112
113 def get_invalid_ip_addresses(resources, port_property, parameters):
114     '''
115     Get a list of valid ip addresses for a heat resources section
116     '''
117     invalid_ip_addresses = []
118
119     for k, v in resources.items():
120         if not isinstance(v, dict):
121             continue
122         if 'type' not in v:
123             continue
124         if v['type'] not in 'OS::Nova::Server':
125             continue
126         if 'properties' not in v:
127             continue
128         if 'networks' not in v['properties']:
129             continue
130
131         port_resource = None
132
133         vm_type = get_vm_type_for_nova_server(v)
134         if not vm_type:
135             continue
136
137         # get all ports associated with the nova server
138         properties = v['properties']
139         for network in properties['networks']:
140             for k3, v3 in network.items():
141                 if k3 != 'port':
142                     continue
143                 if not isinstance(v3, dict):
144                     continue
145
146                 if 'get_resource' in v3:
147                     port_id = v3['get_resource']
148                     if not resources[port_id]:
149                         continue
150                     port_resource = resources[port_id]
151                 else:
152                     continue
153
154                 network_role = get_network_role_from_port(port_resource)
155                 if not network_role:
156                     continue
157
158                 for k1, v1 in port_resource["properties"].items():
159                     if k1 != port_property:
160                         continue
161                     for v2 in v1:
162                         if "ip_address" not in v2:
163                             continue
164                         if "get_param" not in v2["ip_address"]:
165                             continue
166                         ip_address = v2["ip_address"]["get_param"]
167
168                         if isinstance(ip_address, list):
169                             ip_address = ip_address[0]
170
171                         if ip_address not in parameters:
172                             continue
173
174                         parameter_type = parameters[ip_address].get("type")
175                         if not parameter_type:
176                             continue
177
178                         valid_ip_address = is_valid_ip_address(ip_address,
179                                                                vm_type,
180                                                                network_role,
181                                                                port_property,
182                                                                parameter_type)
183
184                         if not valid_ip_address:
185                             invalid_ip_addresses.append(ip_address)
186
187     return invalid_ip_addresses
188
189
190 def is_reserved_port(port_id):
191     '''
192     Checks to see if the resource id for a port follows
193     the reserve port concept
194     '''
195     formats = [
196               ["port_id",
197                re.compile(r'reserve_port_(.+?)_floating_ip_\d+')],
198               ["port_id",
199                re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')]]
200     for f in formats:
201         m = f[1].match(port_id.lower())
202         if m and m.group(1):
203             return True
204     return False