X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=ice_validator%2Ftests%2Futils%2Fports.py;h=89440eba4dca6118aa1717d7c5dec5a580ef1bb7;hb=3a37d68be925c92c9a540cf589dfbe4c901d7911;hp=4d0b4ca1d959dbd0ba114d99b2225ac539ab4bdb;hpb=d27b72f2341b3b0657b850cc4fc29bb164d8f97a;p=vvp%2Fvalidation-scripts.git diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py index 4d0b4ca..89440eb 100644 --- a/ice_validator/tests/utils/ports.py +++ b/ice_validator/tests/utils/ports.py @@ -6,7 +6,7 @@ # =================================================================== # # Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); +# under the Apache License, Version 2.0 (the "License"); # you may not use this software except in compliance with the License. # You may obtain a copy of the License at # @@ -21,7 +21,7 @@ # # # Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# under the Creative Commons License, Attribution 4.0 Intl. (the "License"); # you may not use this documentation except in compliance with the License. # You may obtain a copy of the License at # @@ -35,96 +35,138 @@ # # ============LICENSE_END============================================ # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. # -from .network_roles import get_network_role_from_port +from .network_roles import get_network_role_and_type from .vm_types import get_vm_type_for_nova_server import re -def is_valid_ip_address(ip_address, vm_type, network_role, port_property): - ''' +def is_valid_ip_address( + ip_address, vm_type, network_role, port_property, parameter_type, network_type +): + """ Check the ip_address to make sure it is properly formatted and also contains {vm_type} and {network_role} - ''' + """ allowed_formats = [ - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_floating_ip')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_floating_v6_ip')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_floating_ip')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_ip_\d+')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_v6_ip_\d+')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_ip_\d+')], - ["allowed_address_pairs", "comma_delimited_list", - "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "internal", re.compile(r'(.+?)_int_(.+?)_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "external", re.compile(r'(.+?)_v6_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "external", re.compile(r'(.+?)_ips')], - ["fixed_ips", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], - ["fixed_ips", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_ip_\d+')], - ["fixed_ips", "string", "external", - re.compile(r'(.+?)_v6_ip_\d+')], - ["fixed_ips", "string", "external", - re.compile(r'(.+?)_ip_\d+')], - ["fixed_ips", "comma_delimited_list", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ips')], - ["fixed_ips", "comma_delimited_list", "internal", - re.compile(r'(.+?)_int_(.+?)_ips')], - ["fixed_ips", "comma_delimited_list", "external", - re.compile(r'(.+?)_v6_ips')], - ["fixed_ips", "comma_delimited_list", "external", - re.compile(r'(.+?)_ips')], - ] + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"), + ], + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_floating_ip"), + ], + [ + "allowed_address_pairs", + "string", + "external", + re.compile(r"(.+?)_floating_v6_ip"), + ], + [ + "allowed_address_pairs", + "string", + "external", + re.compile(r"(.+?)_floating_ip"), + ], + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"), + ], + [ + "allowed_address_pairs", + "string", + "internal", + re.compile(r"(.+?)_int_(.+?)_ip_\d+"), + ], + ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")], + ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")], + [ + "allowed_address_pairs", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_v6_ips"), + ], + [ + "allowed_address_pairs", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_ips"), + ], + [ + "allowed_address_pairs", + "comma_delimited_list", + "external", + re.compile(r"(.+?)_v6_ips"), + ], + [ + "allowed_address_pairs", + "comma_delimited_list", + "external", + re.compile(r"(.+?)_ips"), + ], + ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")], + ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")], + ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")], + ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")], + [ + "fixed_ips", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_v6_ips"), + ], + [ + "fixed_ips", + "comma_delimited_list", + "internal", + re.compile(r"(.+?)_int_(.+?)_ips"), + ], + ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")], + ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")], + ] for v3 in allowed_formats: + if v3[1] != parameter_type: + continue if v3[0] != port_property: continue + if v3[2] != network_type: + continue # check if pattern matches m = v3[3].match(ip_address) if m: - if (v3[2] == "internal" and - len(m.groups()) > 1): - return m.group(1) == vm_type and\ - m.group(2) == network_role - elif (v3[2] == "external" and - len(m.groups()) > 0): + if v3[2] == "internal" and len(m.groups()) > 1: + return m.group(1) == vm_type and m.group(2) == network_role + elif v3[2] == "external" and len(m.groups()) > 0: return m.group(1) == vm_type + "_" + network_role return False -def get_invalid_ip_addresses(resources, port_property): - ''' +def get_invalid_ip_addresses(resources, port_property, parameters): + """ Get a list of valid ip addresses for a heat resources section - ''' + """ invalid_ip_addresses = [] for k, v in resources.items(): if not isinstance(v, dict): continue - if 'type' not in v: + if "type" not in v: continue - if v['type'] not in 'OS::Nova::Server': + if v["type"] not in "OS::Nova::Server": continue - if 'properties' not in v: + if "properties" not in v: continue - if 'networks' not in v['properties']: + if "networks" not in v["properties"]: continue port_resource = None @@ -134,24 +176,24 @@ def get_invalid_ip_addresses(resources, port_property): continue # get all ports associated with the nova server - properties = v['properties'] - for network in properties['networks']: + properties = v["properties"] + for network in properties["networks"]: for k3, v3 in network.items(): - if k3 != 'port': + if k3 != "port": continue if not isinstance(v3, dict): continue - if 'get_resource' in v3: - port_id = v3['get_resource'] + if "get_resource" in v3: + port_id = v3["get_resource"] if not resources[port_id]: continue port_resource = resources[port_id] else: continue - network_role = get_network_role_from_port(port_resource) - if not network_role: + network_role, network_type = get_network_role_and_type(port_resource) + if not network_role or not network_type: continue for k1, v1 in port_resource["properties"].items(): @@ -162,16 +204,26 @@ def get_invalid_ip_addresses(resources, port_property): continue if "get_param" not in v2["ip_address"]: continue - ip_address = v2["ip_address"]["get_param"] if isinstance(ip_address, list): ip_address = ip_address[0] - valid_ip_address = is_valid_ip_address(ip_address, - vm_type, - network_role, - port_property) + if ip_address not in parameters: + continue + + parameter_type = parameters[ip_address].get("type") + if not parameter_type: + continue + + valid_ip_address = is_valid_ip_address( + ip_address, + vm_type, + network_role, + port_property, + parameter_type, + network_type, + ) if not valid_ip_address: invalid_ip_addresses.append(ip_address) @@ -179,19 +231,17 @@ def get_invalid_ip_addresses(resources, port_property): return invalid_ip_addresses -def is_reserved_port(port_id): - ''' - Checks to see if the resource id for a port follows - the reserve port concept - ''' - formats = [ - ["port_id", - re.compile(r'reserve_port_(.+?)_floating_ip_\d+')], - ["port_id", - re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')], - ] - for f in formats: - m = f[1].match(port_id.lower()) - if m and m.group(1): - return True - return False +def get_list_of_ports_attached_to_nova_server(nova_server): + networks_list = nova_server.get("properties", {}).get("networks") + + port_ids = [] + if networks_list: + for network in networks_list: + network_prop = network.get("port") + if network_prop: + pid = network_prop.get("get_param") + if not pid: + pid = network_prop.get("get_resource") + port_ids.append(pid) + + return port_ids