# ============LICENSE_START=======================================================
# org.onap.vvp/validation-scripts
# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright © 2019 AT&T Intellectual Property. All rights reserved.
# ===================================================================
#
# Unless otherwise specified, all software contained herein is licensed
# ============LICENSE_END============================================
#
#
+from tests.structures import Heat
+from tests.helpers import parameter_type_to_heat_type, prop_iterator
+from . import nested_dict
-from .network_roles import get_network_role_and_type
-from .vm_types import get_vm_type_for_nova_server
-import re
+AAP_EXEMPT_CAVEAT = (
+ "If this VNF is not able to adhere to this requirement, please consult the Heat "
+ "Orchestration Template guidelines for more information. If you are knowingly "
+ "violating this requirement after reading the guidelines, then add the parameter "
+ "to the aap_exempt list under this resources metadata to suppress this warning."
+)
-def is_valid_ip_address(
- ip_address, vm_type, network_role, port_property, parameter_type, network_type
-):
- """
- Check the ip_address to make sure it is properly formatted and
- also contains {vm_type} and {network_role}
- """
- allowed_formats = [
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_floating_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "external",
- re.compile(r"(.+?)_floating_v6_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "external",
- re.compile(r"(.+?)_floating_ip"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"),
- ],
- [
- "allowed_address_pairs",
- "string",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ip_\d+"),
- ],
- ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
- ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "external",
- re.compile(r"(.+?)_v6_ips"),
- ],
- [
- "allowed_address_pairs",
- "comma_delimited_list",
- "external",
- re.compile(r"(.+?)_ips"),
- ],
- ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")],
- ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")],
- ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")],
- ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")],
- [
- "fixed_ips",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_v6_ips"),
- ],
- [
- "fixed_ips",
- "comma_delimited_list",
- "internal",
- re.compile(r"(.+?)_int_(.+?)_ips"),
- ],
- ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")],
- ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")],
- ]
-
- for v3 in allowed_formats:
- if v3[1] != parameter_type:
- continue
- if v3[0] != port_property:
- continue
- if v3[2] != network_type:
- continue
- # check if pattern matches
- m = v3[3].match(ip_address)
- if m:
- if v3[2] == "internal" and len(m.groups()) > 1:
- return m.group(1) == vm_type and m.group(2) == network_role
- elif v3[2] == "external" and len(m.groups()) > 0:
- return m.group(1) == vm_type + "_" + network_role
+def get_aap_exemptions(resource_props):
+ """
+ Gets the list of parameters that the Heat author has exempted from following
+ the naming conventions associated with AAP.
- return False
+ :param resource_props: dict of properties under the resource ID
+ :return: list of all parameters to exempt or an empty list
+ """
+ metadata = resource_props.get("metadata") or {}
+ return metadata.get("aap_exempt") or []
-def get_invalid_ip_addresses(resources, port_property, parameters):
+def check_parameter_format(
+ yaml_file, regx, intext, resource_processor, *properties, exemptions_allowed=False
+):
"""
- Get a list of valid ip addresses for a heat resources section
+ yaml_file: input file to check
+ regx: dictionary containing the regex to use to validate parameter
+ intext: internal or external
+ resource_processor: resource type specific helper, defined in structures.py
+ properties: arg list of property that is being checked
+ exemptions_allowed: If True, then parameters in the aap_exempt list are allowed to
+ not follow the rules
"""
- invalid_ip_addresses = []
- for k, v in resources.items():
- if not isinstance(v, dict):
- continue
- if "type" not in v:
- continue
- if v["type"] not in "OS::Nova::Server":
- continue
- if "properties" not in v:
- continue
- if "networks" not in v["properties"]:
+ invalid_parameters = []
+ heat = Heat(filepath=yaml_file)
+ resource_type = resource_processor.resource_type
+ resources = heat.get_resource_by_type(resource_type)
+ heat_parameters = heat.parameters
+ for rid, resource in resources.items():
+ resource_intext, port_match = resource_processor.get_rid_match_tuple(rid)
+ if not port_match:
+ continue # port resource ID not formatted correctely
+
+ if (
+ resource_intext != intext
+ ): # skipping if type (internal/external) doesn't match
continue
- port_resource = None
+ for param in prop_iterator(resource, *properties):
+ if (
+ param
+ and isinstance(param, dict)
+ and "get_resource" not in param
+ and "get_attr" not in param
+ ):
+ # checking parameter uses get_param
+ parameter = param.get("get_param")
+ if not parameter:
+ msg = (
+ "Unexpected parameter format for {} {} property {}: {}. "
+ "Please consult the heat guidelines documentation for details."
+ ).format(resource_type, rid, properties, param)
+ invalid_parameters.append(msg) # should this be a failure?
+ continue
- vm_type = get_vm_type_for_nova_server(v)
- if not vm_type:
- continue
+ # getting parameter if the get_param uses list, and getting official
+ # HEAT parameter type
+ parameter_type = parameter_type_to_heat_type(parameter)
+ if parameter_type == "comma_delimited_list":
+ parameter = parameter[0]
+ elif parameter_type != "string":
+ continue
- # get all ports associated with the nova server
- properties = v["properties"]
- for network in properties["networks"]:
- for k3, v3 in network.items():
- if k3 != "port":
+ # checking parameter format = parameter type defined in parameters
+ # section
+ heat_parameter_type = nested_dict.get(
+ heat_parameters, parameter, "type"
+ )
+ if not heat_parameter_type or heat_parameter_type != parameter_type:
+ msg = (
+ "{} {} parameter {} defined as type {} "
+ + "is being used as type {} in the heat template"
+ ).format(
+ resource_type,
+ properties,
+ parameter,
+ heat_parameter_type,
+ parameter_type,
+ )
+ invalid_parameters.append(msg) # should this actually be an error?
continue
- if not isinstance(v3, dict):
+
+ if exemptions_allowed and parameter in get_aap_exemptions(resource):
continue
- if "get_resource" in v3:
- port_id = v3["get_resource"]
- if not resources[port_id]:
- continue
- port_resource = resources[port_id]
- else:
+ # if parameter type is not in regx dict, then it is not supported
+ # by automation
+ regx_dict = regx[resource_intext].get(parameter_type)
+ if not regx_dict:
+ msg = (
+ "{} {} {} parameter {} defined as type {} "
+ "which is required by platform data model for proper "
+ "assignment and inventory."
+ ).format(resource_type, rid, properties, parameter, parameter_type)
+ if exemptions_allowed:
+ msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT)
+ invalid_parameters.append(msg)
continue
- network_role, network_type = get_network_role_and_type(port_resource)
- if not network_role or not network_type:
+ # checking if param adheres to guidelines format
+ regexp = regx[resource_intext][parameter_type]["machine"]
+ readable_format = regx[resource_intext][parameter_type]["readable"]
+ match = regexp.match(parameter)
+ if not match:
+ msg = (
+ "{} {} property {} parameter {} does not follow {} "
+ "format {} which is required by platform data model for proper "
+ "assignment and inventory."
+ ).format(
+ resource_type,
+ rid,
+ properties,
+ parameter,
+ resource_intext,
+ readable_format,
+ )
+ if exemptions_allowed:
+ msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT)
+ invalid_parameters.append(msg)
continue
- for k1, v1 in port_resource["properties"].items():
- if k1 != port_property:
- continue
- for v2 in v1:
- if "ip_address" not in v2:
- continue
- if "get_param" not in v2["ip_address"]:
- continue
- ip_address = v2["ip_address"]["get_param"]
-
- if isinstance(ip_address, list):
- ip_address = ip_address[0]
-
- if ip_address not in parameters:
- continue
-
- parameter_type = parameters[ip_address].get("type")
- if not parameter_type:
- continue
-
- valid_ip_address = is_valid_ip_address(
- ip_address,
- vm_type,
- network_role,
- port_property,
- parameter_type,
- network_type,
+ # checking that parameter includes correct vm_type/network_role
+ parameter_checks = regx.get("parameter_to_resource_comparisons", [])
+ for check in parameter_checks:
+ resource_match = port_match.group(check)
+ if (
+ resource_match
+ and not parameter.startswith(resource_match)
+ and parameter.find("_{}_".format(resource_match)) == -1
+ ):
+ msg = (
+ "{0} {1} property {2} parameter "
+ "{3} {4} does match resource {4} {5}"
+ ).format(
+ resource_type,
+ rid,
+ properties,
+ parameter,
+ check,
+ resource_match,
)
+ invalid_parameters.append(msg)
+ continue
- if not valid_ip_address:
- invalid_ip_addresses.append(ip_address)
-
- return invalid_ip_addresses
+ assert not invalid_parameters, "%s" % "\n".join(invalid_parameters)
def get_list_of_ports_attached_to_nova_server(nova_server):