[VVP] revert nested resource section
[vvp/validation-scripts.git] / ice_validator / tests / test_neutron_port_addresses.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 #
39
40 """
41 OS::Neutron::Port connecting to external network
42 must have at most one ip_address and at most one v6_ip_address.
43 """
44
45 import collections
46 import os.path
47
48 import pytest
49
50 from .structures import Heat
51 from .helpers import validates
52
53 VERSION = "1.1.0"
54
55
56 def is_v6_ip(ip_address):
57     if ip_address.find("v6") != -1:
58         return True
59     return False
60
61
62 def get_neutron_ports(heat):
63     """Return dict of resource_id: resource, whose type is
64     OS::Neutron::Port.
65     """
66     return {
67         rid: resource
68         for rid, resource in heat.resources.items()
69         if heat.nested_get(resource, "type") == "OS::Neutron::Port"
70     }
71
72
73 def get_port_addresses(filepath):
74     """Return dict:
75     key is field name, value is dict:
76         key is parameter name, value is dict:
77             key is filepath, value is set of rid
78     """
79     port_addresses = collections.defaultdict(
80         lambda: collections.defaultdict(lambda: collections.defaultdict(set))
81     )
82     heat = Heat(filepath=filepath)
83     basename = os.path.basename(filepath)
84     for rid, port in get_neutron_ports(heat).items():
85         allowed_address_pairs = heat.nested_get(
86             port, "properties", "allowed_address_pairs"
87         )
88         if not isinstance(allowed_address_pairs, list):
89             continue
90         field = "ip_address"
91         for aa_pair in allowed_address_pairs:
92             param = heat.nested_get(aa_pair, field, "get_param")
93             if param is None:
94                 continue
95             else:
96                 param = param[0] if isinstance(param, list) else param
97             port_addresses[field][param][basename].add(rid)
98     return port_addresses
99
100
101 def nested_update(out_dict, in_dict):
102     """Recursively update out_dict from in_dict.
103     """
104     for key, value in in_dict.items():
105         if key not in out_dict:
106             out_dict[key] = value
107         elif isinstance(value, dict) and isinstance(out_dict[key], dict):
108             out_dict[key] = nested_update(out_dict[key], value)
109         elif isinstance(value, set) and isinstance(out_dict[key], set):
110             out_dict[key].update(value)
111         else:
112             out_dict[key] = value
113     return out_dict
114
115
116 def run_test(heat_template, validate):
117     """call validate with allowed_address_pairs
118     """
119     heat = Heat(filepath=heat_template)
120     if not heat.resources:
121         pytest.skip("No resources found")
122
123     neutron_ports = get_neutron_ports(heat)
124     if not neutron_ports:
125         pytest.skip("No OS::Neutron::Port resources found")
126
127     bad = {}
128     for rid, resource in neutron_ports.items():
129         if rid.startswith("int_"):
130             continue
131         allowed_address_pairs = heat.nested_get(
132             resource, "properties", "allowed_address_pairs"
133         )
134         if allowed_address_pairs is None:
135             continue
136         if not isinstance(allowed_address_pairs, list):
137             bad[rid] = "properties.allowed_address_pairs must be a list."
138             continue
139         error = validate(heat, allowed_address_pairs)
140         if error:
141             bad[rid] = error
142             break
143     if bad:
144         # raise RuntimeError(
145         raise AssertionError(
146             "Bad OS::Neutron::Port: %s"
147             % (", ".join("%s: %s" % (rid, error) for rid, error in bad.items()))
148         )
149
150
151 def validate_field(heat, allowed_address_pairs, field, v6=False):
152     """ensure at most one `field` is found in `allowed_address_pairs'
153     validate allowed_addrfess_pairs as well.
154     Returns error message string or None.
155     """
156     error = None
157     ports = set()
158     port_type = "ipv6" if v6 else "ipv4"
159     for allowed_address_pair in allowed_address_pairs:
160         if not isinstance(allowed_address_pair, dict):
161             error = 'allowed_address_pair "%s" is not a dict' % (allowed_address_pair)
162             break
163         if field in allowed_address_pair:
164             param = heat.nested_get(allowed_address_pair, field, "get_param")
165             if param is None:
166                 # error = 'allowed_address_pair %s requires "get_param"' % field
167                 break
168             else:
169                 # if v6 and testing v6, or inverse
170                 param = param[0] if isinstance(param, list) else param
171                 if v6 == is_v6_ip(param):
172                     ports.add(param)
173     if error is None and len(ports) > 1:
174         error = 'More than one %s "%s" found in allowed_address_pairs: %s' % (
175             port_type,
176             field,
177             list(ports),
178         )
179     return error
180
181
182 def validate_external_ipaddress(heat, allowed_address_pairs):
183     """ensure allowed_address_pairs has at most one ip_address
184     Returns error message string or None.
185     """
186     return validate_field(heat, allowed_address_pairs, "ip_address")
187
188
189 def validate_external_ipaddress_v6(heat, allowed_address_pairs):
190     """ensure allowed_address_pairs has at most one v6_ip_address
191     Returns error message string or None.
192     """
193     return validate_field(heat, allowed_address_pairs, "ip_address", v6=True)
194
195
196 # pylint: disable=invalid-name
197
198
199 @validates("R-91810")
200 def test_neutron_port_external_ipaddress(yaml_file):
201     """
202     If a VNF requires ONAP to assign a Virtual IP (VIP) Address to
203     ports connected an external network, the port
204     **MUST NOT** have more than one IPv4 VIP address.
205     """
206     run_test(yaml_file, validate_external_ipaddress)
207
208
209 @validates("R-41956")
210 def test_neutron_port_external_ipaddress_v6(yaml_file):
211     """
212     If a VNF requires ONAP to assign a Virtual IP (VIP) Address to
213     ports connected an external network, the port
214     **MUST NOT** have more than one IPv6 VIP address.
215     """
216     run_test(yaml_file, validate_external_ipaddress_v6)
217
218
219 @validates("R-10754")
220 def test_neutron_port_floating(yaml_files):
221     """
222     If a VNF has two or more ports that
223     attach to an external network that require a Virtual IP Address (VIP),
224     and the VNF requires ONAP automation to assign the IP address,
225     all the Virtual Machines using the VIP address **MUST**
226     be instantiated in the same Base Module Heat Orchestration Template
227     or in the same Incremental Module Heat Orchestration Template.
228     """
229     fields = {}
230     for filepath in yaml_files:
231         fields = nested_update(fields, get_port_addresses(filepath))
232     bad = []
233     for field, params in fields.items():
234         for param, files in params.items():
235             if len(files) > 1:
236                 error = ["{} {} assigned in multiple templates: ".format(field, param)]
237                 for file_name, r_ids in files.items():
238                     error.append(
239                         "In {} it's assigned to {}. ".format(
240                             file_name, ", ".join(r_ids)
241                         )
242                     )
243                 bad.append("".join(error))
244     assert not bad, "; ".join(bad)