[VVP] updating validation scripts in dublin
[vvp/validation-scripts.git] / ice_validator / tests / test_neutron_port_addresses.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
39 #
40
41 """
42 OS::Neutron::Port connecting to external network
43 must have at most one ip_address and at most one v6_ip_address.
44 """
45
46 import collections
47 import os.path
48
49 import pytest
50
51 from .structures import Heat
52 from .helpers import validates
53
54 VERSION = "1.1.0"
55
56
57 def is_v6_ip(ip_address):
58     if ip_address.find("v6") != -1:
59         return True
60     return False
61
62
63 def get_neutron_ports(heat):
64     """Return dict of resource_id: resource, whose type is
65     OS::Neutron::Port.
66     """
67     return {
68         rid: resource
69         for rid, resource in heat.resources.items()
70         if heat.nested_get(resource, "type") == "OS::Neutron::Port"
71     }
72
73
74 def get_port_addresses(filepath):
75     """Return dict:
76     key is field name, value is dict:
77         key is parameter name, value is dict:
78             key is filepath, value is set of rid
79     """
80     port_addresses = collections.defaultdict(
81         lambda: collections.defaultdict(lambda: collections.defaultdict(set))
82     )
83     heat = Heat(filepath=filepath)
84     basename = os.path.basename(filepath)
85     for rid, port in get_neutron_ports(heat).items():
86         allowed_address_pairs = heat.nested_get(
87             port, "properties", "allowed_address_pairs"
88         )
89         if not isinstance(allowed_address_pairs, list):
90             continue
91         field = "ip_address"
92         for aa_pair in allowed_address_pairs:
93             param = heat.nested_get(aa_pair, field, "get_param")
94             if param is None:
95                 continue
96             port_addresses[field][param][basename].add(rid)
97     return port_addresses
98
99
100 def nested_update(out_dict, in_dict):
101     """Recursively update out_dict from in_dict.
102     """
103     for key, value in in_dict.items():
104         if key not in out_dict:
105             out_dict[key] = value
106         elif isinstance(value, dict) and isinstance(out_dict[key], dict):
107             out_dict[key] = nested_update(out_dict[key], value)
108         elif isinstance(value, set) and isinstance(out_dict[key], set):
109             out_dict[key].update(value)
110         else:
111             out_dict[key] = value
112     return out_dict
113
114
115 def run_test(heat_template, validate):
116     """call validate with allowed_address_pairs
117     """
118     heat = Heat(filepath=heat_template)
119     if not heat.resources:
120         pytest.skip("No resources found")
121
122     neutron_ports = get_neutron_ports(heat)
123     if not neutron_ports:
124         pytest.skip("No OS::Neutron::Port resources found")
125
126     bad = {}
127     for rid, resource in neutron_ports.items():
128         if rid.startswith("int_"):
129             continue
130         allowed_address_pairs = heat.nested_get(
131             resource, "properties", "allowed_address_pairs"
132         )
133         if allowed_address_pairs is None:
134             continue
135         if not isinstance(allowed_address_pairs, list):
136             bad[rid] = "properties.allowed_address_pairs must be a list."
137             continue
138         error = validate(heat, allowed_address_pairs)
139         if error:
140             bad[rid] = error
141             break
142     if bad:
143         # raise RuntimeError(
144         raise AssertionError(
145             "Bad OS::Neutron::Port: %s"
146             % (", ".join("%s: %s" % (rid, error) for rid, error in bad.items()))
147         )
148
149
150 def validate_field(heat, allowed_address_pairs, field, v6=False):
151     """ensure at most one `field` is found in `allowed_address_pairs'
152     validate allowed_addrfess_pairs as well.
153     Returns error message string or None.
154     """
155     error = None
156     ports = set()
157     port_type = "ipv6" if v6 else "ipv4"
158     for allowed_address_pair in allowed_address_pairs:
159         if not isinstance(allowed_address_pair, dict):
160             error = 'allowed_address_pair "%s" is not a dict' % (allowed_address_pair)
161             break
162         if field in allowed_address_pair:
163             param = heat.nested_get(allowed_address_pair, field, "get_param")
164             if param is None:
165                 error = 'allowed_address_pair %s requires "get_param"' % field
166                 break
167             else:
168                 # if v6 and testing v6, or inverse
169                 if v6 == is_v6_ip(param):
170                     ports.add(param)
171     if error is None and len(ports) > 1:
172         error = 'More than one %s "%s" found in allowed_address_pairs: %s' % (
173             port_type,
174             field,
175             list(ports),
176         )
177     return error
178
179
180 def validate_external_ipaddress(heat, allowed_address_pairs):
181     """ensure allowed_address_pairs has at most one ip_address
182     Returns error message string or None.
183     """
184     return validate_field(heat, allowed_address_pairs, "ip_address")
185
186
187 def validate_external_ipaddress_v6(heat, allowed_address_pairs):
188     """ensure allowed_address_pairs has at most one v6_ip_address
189     Returns error message string or None.
190     """
191     return validate_field(heat, allowed_address_pairs, "ip_address", v6=True)
192
193
194 # pylint: disable=invalid-name
195
196
197 @validates("R-91810")
198 def test_neutron_port_external_ipaddress(heat_template):
199     """
200     If a VNF requires ECOMP to assign a Virtual IP (VIP) Address to
201     ports connected an external network, the port
202     **MUST NOT** have more than one IPv4 VIP address.
203     """
204     run_test(heat_template, validate_external_ipaddress)
205
206
207 @validates("R-41956")
208 def test_neutron_port_external_ipaddress_v6(heat_template):
209     """
210     If a VNF requires ECOMP to assign a Virtual IP (VIP) Address to
211     ports connected an external network, the port
212     **MUST NOT** have more than one IPv6 VIP address.
213     """
214     run_test(heat_template, validate_external_ipaddress_v6)
215
216
217 @validates("R-10754")
218 def test_neutron_port_floating(yaml_files):
219     """
220     If a VNF has two or more ports that
221     attach to an external network that require a Virtual IP Address (VIP),
222     and the VNF requires ECOMP automation to assign the IP address,
223     all the Virtual Machines using the VIP address **MUST**
224     be instantiated in the same Base Module Heat Orchestration Template
225     or in the same Incremental Module Heat Orchestration Template.
226     """
227     fields = {}
228     for filepath in yaml_files:
229         fields = nested_update(fields, get_port_addresses(filepath))
230     bad = []
231     for field, params in fields.items():
232         for param, files in params.items():
233             if len(files) > 1:
234                 bad.append(
235                     '"%s" "%s" in multiple templates: %s'
236                     % (
237                         field,
238                         param,
239                         ", ".join("%s: %s" % (k, list(v)) for k, v in files.items()),
240                     )
241                 )
242     assert not bad, "; ".join(bad)