[VVP] updating OS::Neutron::Port parameter tests
[vvp/validation-scripts.git] / ice_validator / tests / utils / ports.py
index 89440eb..c005e32 100644 (file)
 # ============LICENSE_END============================================
 #
 #
-
 from .network_roles import get_network_role_and_type
-from .vm_types import get_vm_type_for_nova_server
-import re
-
-
-def is_valid_ip_address(
-    ip_address, vm_type, network_role, port_property, parameter_type, network_type
-):
-    """
-    Check the ip_address to make sure it is properly formatted and
-    also contains {vm_type} and {network_role}
-    """
-
-    allowed_formats = [
-        [
-            "allowed_address_pairs",
-            "string",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
-        ],
-        [
-            "allowed_address_pairs",
-            "string",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_floating_ip"),
-        ],
-        [
-            "allowed_address_pairs",
-            "string",
-            "external",
-            re.compile(r"(.+?)_floating_v6_ip"),
-        ],
-        [
-            "allowed_address_pairs",
-            "string",
-            "external",
-            re.compile(r"(.+?)_floating_ip"),
-        ],
-        [
-            "allowed_address_pairs",
-            "string",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
-        ],
-        [
-            "allowed_address_pairs",
-            "string",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
-        ],
-        ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
-        ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
-        [
-            "allowed_address_pairs",
-            "comma_delimited_list",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_v6_ips"),
-        ],
-        [
-            "allowed_address_pairs",
-            "comma_delimited_list",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_ips"),
-        ],
-        [
-            "allowed_address_pairs",
-            "comma_delimited_list",
-            "external",
-            re.compile(r"(.+?)_v6_ips"),
-        ],
-        [
-            "allowed_address_pairs",
-            "comma_delimited_list",
-            "external",
-            re.compile(r"(.+?)_ips"),
-        ],
-        ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
-        ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
-        ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
-        ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
-        [
-            "fixed_ips",
-            "comma_delimited_list",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_v6_ips"),
-        ],
-        [
-            "fixed_ips",
-            "comma_delimited_list",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_ips"),
-        ],
-        ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
-        ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
-    ]
-
-    for v3 in allowed_formats:
-        if v3[1] != parameter_type:
-            continue
-        if v3[0] != port_property:
-            continue
-        if v3[2] != network_type:
-            continue
-        # check if pattern matches
-        m = v3[3].match(ip_address)
-        if m:
-            if v3[2] == "internal" and len(m.groups()) > 1:
-                return m.group(1) == vm_type and m.group(2) == network_role
-            elif v3[2] == "external" and len(m.groups()) > 0:
-                return m.group(1) == vm_type + "_" + network_role
+from tests.structures import Heat, NeutronPortProcessor
+from tests.helpers import parameter_type_to_heat_type
+from . import nested_dict
 
-    return False
 
-
-def get_invalid_ip_addresses(resources, port_property, parameters):
+def check_ip_format(yaml_file, regx, port_type, resource_property, nested_property):
     """
-    Get a list of valid ip addresses for a heat resources section
+    yaml_file: input file to check
+    regx: dictionary containing the regex to use to validate parameter
+    port_type: internal or external
+    resource_property: OS::Neutron::Port property to check for parameter
+    nested_property: resource_property will be a list of dicts, this is the key to index into
     """
-    invalid_ip_addresses = []
-
-    for k, v in resources.items():
-        if not isinstance(v, dict):
-            continue
-        if "type" not in v:
+    invalid_ips = []
+    heat = Heat(filepath=yaml_file)
+    ports = heat.get_resource_by_type("OS::Neutron::Port")
+    heat_parameters = heat.parameters
+
+    for rid, resource in ports.items():
+        network_role, network_type = get_network_role_and_type(resource)
+        if (
+            network_type != port_type
+        ):  # skipping if port type (internal/external) doesn't match
             continue
-        if v["type"] not in "OS::Nova::Server":
-            continue
-        if "properties" not in v:
-            continue
-        if "networks" not in v["properties"]:
-            continue
-
-        port_resource = None
-
-        vm_type = get_vm_type_for_nova_server(v)
-        if not vm_type:
-            continue
-
-        # get all ports associated with the nova server
-        properties = v["properties"]
-        for network in properties["networks"]:
-            for k3, v3 in network.items():
-                if k3 != "port":
-                    continue
-                if not isinstance(v3, dict):
-                    continue
-
-                if "get_resource" in v3:
-                    port_id = v3["get_resource"]
-                    if not resources[port_id]:
-                        continue
-                    port_resource = resources[port_id]
-                else:
-                    continue
 
-                network_role, network_type = get_network_role_and_type(port_resource)
-                if not network_role or not network_type:
+        name, port_match = NeutronPortProcessor.get_rid_match_tuple(rid)
+        if not port_match:
+            continue  # port resource ID not formatted correctely
+
+        params = nested_dict.get(resource, "properties", resource_property, default={})
+
+        for param in params:
+            prop = nested_dict.get(param, nested_property)
+            if (
+                not prop
+                or not isinstance(prop, dict)
+                or "get_resource" in prop
+                or "get_attr" in prop
+                # or "str_replace" in prop - should str_replace be checked?
+            ):
+                continue  # lets only check parameters shall we?
+
+            # checking parameter uses get_param
+            parameter = nested_dict.get(prop, "get_param")
+            if not parameter:
+                msg = (
+                    "Unexpected parameter format for OS::Neutron::Port {} property {}: {}. "
+                    + "Please consult the heat guidelines documentation for details."
+                ).format(rid, resource_property, prop)
+                invalid_ips.append(msg)  # should this be a failure?
+                continue
+
+            # getting parameter if the get_param uses list, and getting official HEAT parameter type
+            parameter_type = parameter_type_to_heat_type(parameter)
+            if parameter_type == "comma_delimited_list":
+                parameter = parameter[0]
+            elif parameter_type != "string":
+                continue
+
+            # checking parameter format = type defined in template
+            heat_parameter_type = nested_dict.get(heat_parameters, parameter, "type")
+            if not heat_parameter_type or heat_parameter_type != parameter_type:
+                msg = (
+                    "OS::Neutron::Port {} parameter {} defined as type {} "
+                    + "is being used as type {} in the heat template"
+                ).format(
+                    resource_property, parameter, heat_parameter_type, parameter_type
+                )
+                invalid_ips.append(msg)
+                continue
+
+            # if parameter type is not in regx dict, then it is not supported by automation
+            regx_dict = regx[port_type].get(parameter_type)
+            if not regx_dict:
+                msg = (
+                    "WARNING: OS::Neutron::Port {} parameter {} defined as type {} "
+                    + "is not supported by platform automation. If this VNF is not able "
+                    + "to adhere to this requirement, please consult the Heat Orchestration "
+                    + "Template guidelines for alternative solutions. If already adhering to "
+                    + "an alternative provided by the heat guidelines, please disregard this "
+                    + "message."
+                ).format(resource_property, parameter, parameter_type)
+                invalid_ips.append(msg)
+                continue
+
+            # checking if param adheres to guidelines format
+            regexp = regx[port_type][parameter_type]["machine"]
+            readable_format = regx[port_type][parameter_type]["readable"]
+            match = regexp.match(parameter)
+            if not match:
+                msg = "{} parameter {} does not follow format {}".format(
+                    resource_property, parameter, readable_format
+                )
+                invalid_ips.append(msg)
+                continue
+
+            # checking that parameter includes correct vm_type/network_role
+            parameter_checks = regx.get("parameter_to_resource_comparisons", [])
+            for check in parameter_checks:
+                resource_match = port_match.group(check)
+                if (
+                    resource_match
+                    and not parameter.startswith(resource_match)
+                    and parameter.find("_{}_".format(resource_match)) == -1
+                ):
+                    msg = (
+                        "OS::Neutron::Port {0} property {1} parameter "
+                        + "{2} {3} does match resource {3} {4}"
+                    ).format(rid, resource_property, parameter, check, resource_match)
+                    invalid_ips.append(msg)
                     continue
 
-                for k1, v1 in port_resource["properties"].items():
-                    if k1 != port_property:
-                        continue
-                    for v2 in v1:
-                        if "ip_address" not in v2:
-                            continue
-                        if "get_param" not in v2["ip_address"]:
-                            continue
-                        ip_address = v2["ip_address"]["get_param"]
-
-                        if isinstance(ip_address, list):
-                            ip_address = ip_address[0]
-
-                        if ip_address not in parameters:
-                            continue
-
-                        parameter_type = parameters[ip_address].get("type")
-                        if not parameter_type:
-                            continue
-
-                        valid_ip_address = is_valid_ip_address(
-                            ip_address,
-                            vm_type,
-                            network_role,
-                            port_property,
-                            parameter_type,
-                            network_type,
-                        )
-
-                        if not valid_ip_address:
-                            invalid_ip_addresses.append(ip_address)
-
-    return invalid_ip_addresses
+    assert not invalid_ips, "%s" % "\n".join(invalid_ips)
 
 
 def get_list_of_ports_attached_to_nova_server(nova_server):