[VVP] add bug fixes and reserve port updates
[vvp/validation-scripts.git] / ice_validator / tests / utils / ports.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the “License”);
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
39 #
40
41 from .network_roles import get_network_role_from_port
42 from .vm_types import get_vm_type_for_nova_server
43 import re
44
45
46 def is_valid_ip_address(ip_address, vm_type, network_role, port_property):
47     '''
48     Check the ip_address to make sure it is properly formatted and
49     also contains {vm_type} and {network_role}
50     '''
51
52     allowed_formats = [
53                       ["allowed_address_pairs", "string", "internal",
54                        re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')],
55                       ["allowed_address_pairs", "string", "internal",
56                        re.compile(r'(.+?)_int_(.+?)_floating_ip')],
57                       ["allowed_address_pairs", "string", "external",
58                        re.compile(r'(.+?)_floating_v6_ip')],
59                       ["allowed_address_pairs", "string", "external",
60                        re.compile(r'(.+?)_floating_ip')],
61                       ["allowed_address_pairs", "string", "internal",
62                        re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
63                       ["allowed_address_pairs", "string", "internal",
64                        re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
65                       ["allowed_address_pairs", "string", "external",
66                        re.compile(r'(.+?)_v6_ip_\d+')],
67                       ["allowed_address_pairs", "string", "external",
68                        re.compile(r'(.+?)_ip_\d+')],
69                       ["allowed_address_pairs", "comma_delimited_list",
70                        "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')],
71                       ["allowed_address_pairs", "comma_delimited_list",
72                        "internal", re.compile(r'(.+?)_int_(.+?)_ips')],
73                       ["allowed_address_pairs", "comma_delimited_list",
74                        "external", re.compile(r'(.+?)_v6_ips')],
75                       ["allowed_address_pairs", "comma_delimited_list",
76                        "external", re.compile(r'(.+?)_ips')],
77                       ["fixed_ips", "string", "internal",
78                        re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
79                       ["fixed_ips", "string", "internal",
80                        re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
81                       ["fixed_ips", "string", "external",
82                        re.compile(r'(.+?)_v6_ip_\d+')],
83                       ["fixed_ips", "string", "external",
84                        re.compile(r'(.+?)_ip_\d+')],
85                       ["fixed_ips", "comma_delimited_list", "internal",
86                        re.compile(r'(.+?)_int_(.+?)_v6_ips')],
87                       ["fixed_ips", "comma_delimited_list", "internal",
88                        re.compile(r'(.+?)_int_(.+?)_ips')],
89                       ["fixed_ips", "comma_delimited_list", "external",
90                        re.compile(r'(.+?)_v6_ips')],
91                       ["fixed_ips", "comma_delimited_list", "external",
92                        re.compile(r'(.+?)_ips')],
93                       ]
94
95     for v3 in allowed_formats:
96         if v3[0] != port_property:
97             continue
98         # check if pattern matches
99         m = v3[3].match(ip_address)
100         if m:
101             if (v3[2] == "internal" and
102                     len(m.groups()) > 1):
103                     return m.group(1) == vm_type and\
104                         m.group(2) == network_role
105             elif (v3[2] == "external" and
106                   len(m.groups()) > 0):
107                 return m.group(1) == vm_type + "_" + network_role
108
109     return False
110
111
112 def get_invalid_ip_addresses(resources, port_property):
113     '''
114     Get a list of valid ip addresses for a heat resources section
115     '''
116     invalid_ip_addresses = []
117
118     for k, v in resources.items():
119         if not isinstance(v, dict):
120             continue
121         if 'type' not in v:
122             continue
123         if v['type'] not in 'OS::Nova::Server':
124             continue
125         if 'properties' not in v:
126             continue
127         if 'networks' not in v['properties']:
128             continue
129
130         port_resource = None
131
132         vm_type = get_vm_type_for_nova_server(v)
133         if not vm_type:
134             continue
135
136         # get all ports associated with the nova server
137         properties = v['properties']
138         for network in properties['networks']:
139             for k3, v3 in network.items():
140                 if k3 != 'port':
141                     continue
142                 if not isinstance(v3, dict):
143                     continue
144
145                 if 'get_resource' in v3:
146                     port_id = v3['get_resource']
147                     if not resources[port_id]:
148                         continue
149                     port_resource = resources[port_id]
150                 else:
151                     continue
152
153                 network_role = get_network_role_from_port(port_resource)
154                 if not network_role:
155                     continue
156
157                 for k1, v1 in port_resource["properties"].items():
158                     if k1 != port_property:
159                         continue
160                     for v2 in v1:
161                         if "ip_address" not in v2:
162                             continue
163                         if "get_param" not in v2["ip_address"]:
164                             continue
165
166                         ip_address = v2["ip_address"]["get_param"]
167
168                         if isinstance(ip_address, list):
169                             ip_address = ip_address[0]
170
171                         valid_ip_address = is_valid_ip_address(ip_address,
172                                                                vm_type,
173                                                                network_role,
174                                                                port_property)
175
176                         if not valid_ip_address:
177                             invalid_ip_addresses.append(ip_address)
178
179     return invalid_ip_addresses
180
181
182 def is_reserved_port(port_id):
183     '''
184     Checks to see if the resource id for a port follows
185     the reserve port concept
186     '''
187     formats = [
188             ["port_id",
189              re.compile(r'reserve_port_(.+?)_floating_ip_\d+')],
190             ["port_id",
191              re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')],
192             ]
193     for f in formats:
194         m = f[1].match(port_id.lower())
195         if m and m.group(1):
196             return True
197     return False