X-Git-Url: https://gerrit.onap.org/r/gitweb?p=vvp%2Fvalidation-scripts.git;a=blobdiff_plain;f=ice_validator%2Ftests%2Futils%2Fports.py;fp=ice_validator%2Ftests%2Futils%2Fports.py;h=51e920adbd9bcaf3300b36ada0b2dc1ef8eb71d2;hp=0000000000000000000000000000000000000000;hb=cc21b8b08b6dbcec577bfb26ff397ac899da8002;hpb=6755f61e44870ff65de0630a403f7b4ddfea4071 diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py new file mode 100644 index 0000000..51e920a --- /dev/null +++ b/ice_validator/tests/utils/ports.py @@ -0,0 +1,179 @@ +# -*- coding: utf8 -*- +# ============LICENSE_START======================================================= +# org.onap.vvp/validation-scripts +# =================================================================== +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# =================================================================== +# +# Unless otherwise specified, all software contained herein is licensed +# under the Apache License, Version 2.0 (the “License”); +# you may not use this software except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# +# Unless otherwise specified, all documentation contained herein is licensed +# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# you may not use this documentation except in compliance with the License. +# You may obtain a copy of the License at +# +# https://creativecommons.org/licenses/by/4.0/ +# +# Unless required by applicable law or agreed to in writing, documentation +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ============LICENSE_END============================================ +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# + +from .network_roles import get_network_role_from_port +from .vm_types import get_vm_type_for_nova_server +import re + + +def is_valid_ip_address(ip_address, vm_type, network_role, port_property): + ''' + Check the ip_address to make sure it is properly formatted and + also contains {vm_type} and {network_role} + ''' + + allowed_formats = [ + ["allowed_address_pairs", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')], + ["allowed_address_pairs", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_floating_ip')], + ["allowed_address_pairs", "string", "external", + re.compile(r'(.+?)_floating_v6_ip')], + ["allowed_address_pairs", "string", "external", + re.compile(r'(.+?)_floating_ip')], + ["allowed_address_pairs", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], + ["allowed_address_pairs", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_ip_\d+')], + ["allowed_address_pairs", "string", "external", + re.compile(r'(.+?)_v6_ip_\d+')], + ["allowed_address_pairs", "string", "external", + re.compile(r'(.+?)_ip_\d+')], + ["allowed_address_pairs", "comma_delimited_list", + "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')], + ["allowed_address_pairs", "comma_delimited_list", + "internal", re.compile(r'(.+?)_int_(.+?)_ips')], + ["allowed_address_pairs", "comma_delimited_list", + "external", re.compile(r'(.+?)_v6_ips')], + ["allowed_address_pairs", "comma_delimited_list", + "external", re.compile(r'(.+?)_ips')], + ["fixed_ips", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], + ["fixed_ips", "string", "internal", + re.compile(r'(.+?)_int_(.+?)_ip_\d+')], + ["fixed_ips", "string", "external", + re.compile(r'(.+?)_v6_ip_\d+')], + ["fixed_ips", "string", "external", + re.compile(r'(.+?)_ip_\d+')], + ["fixed_ips", "comma_delimited_list", "internal", + re.compile(r'(.+?)_int_(.+?)_v6_ips')], + ["fixed_ips", "comma_delimited_list", "internal", + re.compile(r'(.+?)_int_(.+?)_ips')], + ["fixed_ips", "comma_delimited_list", "external", + re.compile(r'(.+?)_v6_ips')], + ["fixed_ips", "comma_delimited_list", "external", + re.compile(r'(.+?)_ips')], + ] + + for v3 in allowed_formats: + if v3[0] != port_property: + continue + # check if pattern matches + m = v3[3].match(ip_address) + if m: + if (v3[2] == "internal" and + len(m.groups()) > 1): + return m.group(1) == vm_type and\ + m.group(2) == network_role + elif (v3[2] == "external" and + len(m.groups()) > 0): + return m.group(1) == vm_type + "_" + network_role + + return False + + +def get_invalid_ip_addresses(resources, port_property): + ''' + Get a list of valid ip addresses for a heat resources section + ''' + invalid_ip_addresses = [] + + for k, v in resources.items(): + if not isinstance(v, dict): + continue + if 'type' not in v: + continue + if v['type'] not in 'OS::Nova::Server': + continue + if 'properties' not in v: + continue + if 'networks' not in v['properties']: + continue + + port_resource = None + + vm_type = get_vm_type_for_nova_server(v) + if not vm_type: + continue + + # get all ports associated with the nova server + properties = v['properties'] + for network in properties['networks']: + for k3, v3 in network.items(): + if k3 != 'port': + continue + if not isinstance(v3, dict): + continue + + if 'get_resource' in v3: + port_id = v3['get_resource'] + if not resources[port_id]: + continue + port_resource = resources[port_id] + else: + continue + + network_role = get_network_role_from_port(port_resource) + if not network_role: + continue + + for k1, v1 in port_resource["properties"].items(): + if k1 != port_property: + continue + for v2 in v1: + if "ip_address" not in v2: + continue + if "get_param" not in v2["ip_address"]: + continue + + ip_address = v2["ip_address"]["get_param"] + + if isinstance(ip_address, list): + ip_address = ip_address[0] + + valid_ip_address = is_valid_ip_address(ip_address, + vm_type, + network_role, + port_property) + + if not valid_ip_address: + invalid_ip_addresses.append(ip_address) + + return invalid_ip_addresses