[VVP] fixing relative imports for VVP
[vvp/validation-scripts.git] / ice_validator / tests / utils / ports.py
index e479201..d57625d 100644 (file)
@@ -2,7 +2,7 @@
 # ============LICENSE_START=======================================================
 # org.onap.vvp/validation-scripts
 # ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright © 2019 AT&T Intellectual Property. All rights reserved.
 # ===================================================================
 #
 # Unless otherwise specified, all software contained herein is licensed
 #
 # ============LICENSE_END============================================
 #
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
 #
+from tests.structures import Heat
+from tests.helpers import parameter_type_to_heat_type, prop_iterator
+from tests.utils import nested_dict
 
-from .network_roles import get_network_role_from_port
-from .vm_types import get_vm_type_for_nova_server
-import re
 
+AAP_EXEMPT_CAVEAT = (
+    "If this VNF is not able to adhere to this requirement, please consult the Heat "
+    "Orchestration Template guidelines for more information. If you are knowingly "
+    "violating this requirement after reading the guidelines, then add the parameter "
+    "to the aap_exempt list under this resources metadata to suppress this warning."
+)
 
-def is_valid_ip_address(ip_address, vm_type, network_role, port_property):
-    """
-    Check the ip_address to make sure it is properly formatted and
-    also contains {vm_type} and {network_role}
-    """
 
-    allowed_formats = [
-        [
-            "allowed_address_pairs",
-            "string",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
-        ],
-        [
-            "allowed_address_pairs",
-            "string",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_floating_ip"),
-        ],
-        [
-            "allowed_address_pairs",
-            "string",
-            "external",
-            re.compile(r"(.+?)_floating_v6_ip"),
-        ],
-        [
-            "allowed_address_pairs",
-            "string",
-            "external",
-            re.compile(r"(.+?)_floating_ip"),
-        ],
-        [
-            "allowed_address_pairs",
-            "string",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
-        ],
-        [
-            "allowed_address_pairs",
-            "string",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
-        ],
-        ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
-        ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
-        [
-            "allowed_address_pairs",
-            "comma_delimited_list",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_v6_ips"),
-        ],
-        [
-            "allowed_address_pairs",
-            "comma_delimited_list",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_ips"),
-        ],
-        [
-            "allowed_address_pairs",
-            "comma_delimited_list",
-            "external",
-            re.compile(r"(.+?)_v6_ips"),
-        ],
-        [
-            "allowed_address_pairs",
-            "comma_delimited_list",
-            "external",
-            re.compile(r"(.+?)_ips"),
-        ],
-        ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
-        ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
-        ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
-        ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
-        [
-            "fixed_ips",
-            "comma_delimited_list",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_v6_ips"),
-        ],
-        [
-            "fixed_ips",
-            "comma_delimited_list",
-            "internal",
-            re.compile(r"(.+?)_int_(.+?)_ips"),
-        ],
-        ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
-        ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
-    ]
-
-    for v3 in allowed_formats:
-        if v3[0] != port_property:
-            continue
-        # check if pattern matches
-        m = v3[3].match(ip_address)
-        if m:
-            if v3[2] == "internal" and len(m.groups()) > 1:
-                return m.group(1) == vm_type and m.group(2) == network_role
-            elif v3[2] == "external" and len(m.groups()) > 0:
-                return m.group(1) == vm_type + "_" + network_role
+def get_aap_exemptions(resource_props):
+    """
+    Gets the list of parameters that the Heat author has exempted from following
+    the naming conventions associated with AAP.
 
-    return False
+    :param resource_props: dict of properties under the resource ID
+    :return: list of all parameters to exempt or an empty list
+    """
+    metadata = resource_props.get("metadata") or {}
+    return metadata.get("aap_exempt") or []
 
 
-def get_invalid_ip_addresses(resources, port_property):
+def check_parameter_format(
+    yaml_file, regx, intext, resource_processor, *properties, exemptions_allowed=False
+):
     """
-    Get a list of valid ip addresses for a heat resources section
+    yaml_file: input file to check
+    regx: dictionary containing the regex to use to validate parameter
+    intext: internal or external
+    resource_processor: resource type specific helper, defined in structures.py
+    properties: arg list of property that is being checked
+    exemptions_allowed: If True, then parameters in the aap_exempt list are allowed to
+                        not follow the rules
     """
-    invalid_ip_addresses = []
-
-    for k, v in resources.items():
-        if not isinstance(v, dict):
-            continue
-        if "type" not in v:
-            continue
-        if v["type"] not in "OS::Nova::Server":
-            continue
-        if "properties" not in v:
-            continue
-        if "networks" not in v["properties"]:
-            continue
 
-        port_resource = None
-
-        vm_type = get_vm_type_for_nova_server(v)
-        if not vm_type:
+    invalid_parameters = []
+    heat = Heat(filepath=yaml_file)
+    resource_type = resource_processor.resource_type
+    resources = heat.get_resource_by_type(resource_type)
+    for rid, resource in resources.items():
+        resource_intext, port_match = resource_processor.get_rid_match_tuple(rid)
+        if not port_match:
+            continue  # port resource ID not formatted correctely
+
+        if (
+            resource_intext != intext
+        ):  # skipping if type (internal/external) doesn't match
             continue
-
-        # get all ports associated with the nova server
-        properties = v["properties"]
-        for network in properties["networks"]:
-            for k3, v3 in network.items():
-                if k3 != "port":
-                    continue
-                if not isinstance(v3, dict):
-                    continue
-
-                if "get_resource" in v3:
-                    port_id = v3["get_resource"]
-                    if not resources[port_id]:
-                        continue
-                    port_resource = resources[port_id]
+        for param in prop_iterator(resource, *properties):
+            if (
+                param
+                and isinstance(param, dict)
+                and "get_resource" not in param
+                and "get_attr" not in param
+            ):
+                template_parameters = []
+                if "str_replace" in param:
+                    # print(param)
+                    template_parameters.extend(
+                        v
+                        for k, v in nested_dict.get(
+                            param, "str_replace", "params", default={}
+                        ).items()
+                    )
                 else:
-                    continue
-
-                network_role = get_network_role_from_port(port_resource)
-                if not network_role:
-                    continue
-
-                for k1, v1 in port_resource["properties"].items():
-                    if k1 != port_property:
-                        continue
-                    for v2 in v1:
-                        if "ip_address" not in v2:
-                            continue
-                        if "get_param" not in v2["ip_address"]:
-                            continue
-
-                        ip_address = v2["ip_address"]["get_param"]
-
-                        if isinstance(ip_address, list):
-                            ip_address = ip_address[0]
-
-                        valid_ip_address = is_valid_ip_address(
-                            ip_address, vm_type, network_role, port_property
-                        )
-
-                        if not valid_ip_address:
-                            invalid_ip_addresses.append(ip_address)
-
-    return invalid_ip_addresses
+                    template_parameters.append(param)
+
+                invalid_template_parameters = []
+                for template_parameter in template_parameters:
+                    # Looping through each parameter to check
+                    # the only case where there can be more than 1 is
+                    # if using str_replace
+                    msg = validate_port_parameter(
+                        resource_type,
+                        rid,
+                        properties,
+                        template_parameter,
+                        resource_intext,
+                        resource,
+                        regx,
+                        port_match,
+                        exemptions_allowed,
+                    )
+
+                    if not msg:
+                        # if we found a valid parameter then
+                        # reset invalide_template_parameters
+                        # and break out of loop
+                        invalid_template_parameters = []
+                        break
+                    else:
+                        # haven't found a valid parameter yet
+                        invalid_template_parameters.append(msg)
+
+                invalid_parameters.extend(x for x in invalid_template_parameters)
+
+    assert not invalid_parameters, "%s" % "\n".join(invalid_parameters)
+
+
+def validate_port_parameter(
+    resource_type,
+    rid,
+    properties,
+    param,
+    resource_intext,
+    resource,
+    regx,
+    port_match,
+    exemptions_allowed,
+):
+    """
+    Performs 4 validations
 
+    1) param actually uses get_param
+    2) parameter_type + network_type (internal/external) is a valid combination
+    3) parameter format matches expected format from input dictionary
+    4) the vm_type or network role from resource matches parameter
 
-def is_reserved_port(port_id):
-    """
-    Checks to see if the resource id for a port follows
-    the reserve port concept
+    If the parameter is present in the resource metadata
+    and exemptions are allowed, then the validation will be skipped.
     """
-    formats = [
-        ["port_id", re.compile(r"reserve_port_(.+?)_floating_ip_\d+")],
-        ["port_id", re.compile(r"reserve_port_(.+?)_floating_v6_ip_\d+")],
-    ]
-    for f in formats:
-        m = f[1].match(port_id.lower())
-        if m and m.group(1):
-            return True
-    return False
+    if isinstance(param, dict) and "get_param" in param:
+        parameter = param.get("get_param")
+    else:
+        return (
+            "Unexpected parameter format for {} {} property {}: {}. "
+            "Please consult the heat guidelines documentation for details."
+        ).format(resource_type, rid, properties, param)
+
+    # getting parameter if the get_param uses list, and getting official
+    # HEAT parameter type
+    parameter_type = parameter_type_to_heat_type(parameter)
+    if parameter_type == "comma_delimited_list":
+        parameter = parameter[0]
+    elif parameter_type != "string":
+        return None
+
+    if exemptions_allowed and parameter in get_aap_exemptions(resource):
+        return None
+
+    # if parameter type is not in regx dict, then it is not supported
+    # by automation
+    regx_dict = regx[resource_intext].get(parameter_type)
+    if not regx_dict:
+        msg = (
+            "{} {} {} parameter {} defined as type {} "
+            "which is required by platform data model for proper "
+            "assignment and inventory."
+        ).format(resource_type, rid, properties, parameter, parameter_type)
+        if exemptions_allowed:
+            msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT)
+        return msg
+
+    msg = validate_parameter_format(
+        regx, parameter_type, resource_intext, parameter, rid, exemptions_allowed
+    )
+    if msg:
+        return msg
+
+    # checking that parameter includes correct vm_type/network_role
+    parameter_checks = regx.get("parameter_to_resource_comparisons", [])
+    for check in parameter_checks:
+        msg = mismatch_resource_and_parameter_attribute(
+            check, port_match, parameter, rid
+        )
+        if msg:
+            return msg
+
+    return None
+
+
+def validate_parameter_format(
+    regx, parameter_type, resource_intext, parameter, rid, exemptions_allowed
+):
+    """Checks if a parameter format matches the expected format
+    from input format dictionary"""
+    msg = None
+    regexp = regx[resource_intext][parameter_type]["machine"]
+    readable_format = regx[resource_intext][parameter_type]["readable"]
+    match = regexp.match(parameter)
+    if not match:
+        msg = (
+            "{} property parameter {} does not follow {} "
+            "format {} which is required by platform data model for proper "
+            "assignment and inventory."
+        ).format(rid, parameter, resource_intext, readable_format)
+        if exemptions_allowed:
+            msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT)
+
+    return msg
+
+
+def mismatch_resource_and_parameter_attribute(check, resource_re_match, parameter, rid):
+    """Compares vm_type or network_role from resource
+    is the same as found in parameter"""
+    resource_match = resource_re_match.group(check)
+    if (
+        resource_match
+        and not parameter.startswith(resource_match)
+        and parameter.find("_{}_".format(resource_match)) == -1
+    ):
+        return ("{0} {1} does not match parameter {2} {1}").format(
+            rid, check, parameter
+        )