# ============LICENSE_START=======================================================
# org.onap.vvp/validation-scripts
# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright © 2019 AT&T Intellectual Property. All rights reserved.
# ===================================================================
#
# Unless otherwise specified, all software contained herein is licensed
#
# ============LICENSE_END============================================
#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
#
-
-from .network_roles import get_network_role_from_port
-from .vm_types import get_vm_type_for_nova_server
-import re
-
-
-def is_valid_ip_address(ip_address, vm_type, network_role, port_property):
- '''
- Check the ip_address to make sure it is properly formatted and
- also contains {vm_type} and {network_role}
- '''
-
- allowed_formats = [
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_floating_ip')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_floating_v6_ip')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_floating_ip')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
- ["allowed_address_pairs", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_v6_ip_\d+')],
- ["allowed_address_pairs", "string", "external",
- re.compile(r'(.+?)_ip_\d+')],
- ["allowed_address_pairs", "comma_delimited_list",
- "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "internal", re.compile(r'(.+?)_int_(.+?)_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "external", re.compile(r'(.+?)_v6_ips')],
- ["allowed_address_pairs", "comma_delimited_list",
- "external", re.compile(r'(.+?)_ips')],
- ["fixed_ips", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')],
- ["fixed_ips", "string", "internal",
- re.compile(r'(.+?)_int_(.+?)_ip_\d+')],
- ["fixed_ips", "string", "external",
- re.compile(r'(.+?)_v6_ip_\d+')],
- ["fixed_ips", "string", "external",
- re.compile(r'(.+?)_ip_\d+')],
- ["fixed_ips", "comma_delimited_list", "internal",
- re.compile(r'(.+?)_int_(.+?)_v6_ips')],
- ["fixed_ips", "comma_delimited_list", "internal",
- re.compile(r'(.+?)_int_(.+?)_ips')],
- ["fixed_ips", "comma_delimited_list", "external",
- re.compile(r'(.+?)_v6_ips')],
- ["fixed_ips", "comma_delimited_list", "external",
- re.compile(r'(.+?)_ips')],
- ]
-
- for v3 in allowed_formats:
- if v3[0] != port_property:
- continue
- # check if pattern matches
- m = v3[3].match(ip_address)
- if m:
- if (v3[2] == "internal" and
- len(m.groups()) > 1):
- return m.group(1) == vm_type and\
- m.group(2) == network_role
- elif (v3[2] == "external" and
- len(m.groups()) > 0):
- return m.group(1) == vm_type + "_" + network_role
-
- return False
-
-
-def get_invalid_ip_addresses(resources, port_property):
- '''
- Get a list of valid ip addresses for a heat resources section
- '''
- invalid_ip_addresses = []
-
- for k, v in resources.items():
- if not isinstance(v, dict):
- continue
- if 'type' not in v:
- continue
- if v['type'] not in 'OS::Nova::Server':
- continue
- if 'properties' not in v:
- continue
- if 'networks' not in v['properties']:
+from tests.structures import Heat
+from tests.helpers import parameter_type_to_heat_type, prop_iterator
+from . import nested_dict
+
+
+AAP_EXEMPT_CAVEAT = (
+ "If this VNF is not able to adhere to this requirement, please consult the Heat "
+ "Orchestration Template guidelines for more information. If you are knowingly "
+ "violating this requirement after reading the guidelines, then add the parameter "
+ "to the aap_exempt list under this resources metadata to suppress this warning."
+)
+
+
+def get_aap_exemptions(resource_props):
+ """
+ Gets the list of parameters that the Heat author has exempted from following
+ the naming conventions associated with AAP.
+
+ :param resource_props: dict of properties under the resource ID
+ :return: list of all parameters to exempt or an empty list
+ """
+ metadata = resource_props.get("metadata") or {}
+ return metadata.get("aap_exempt") or []
+
+
+def check_parameter_format(
+ yaml_file, regx, intext, resource_processor, *properties, exemptions_allowed=False
+):
+ """
+ yaml_file: input file to check
+ regx: dictionary containing the regex to use to validate parameter
+ intext: internal or external
+ resource_processor: resource type specific helper, defined in structures.py
+ properties: arg list of property that is being checked
+ exemptions_allowed: If True, then parameters in the aap_exempt list are allowed to
+ not follow the rules
+ """
+
+ invalid_parameters = []
+ heat = Heat(filepath=yaml_file)
+ resource_type = resource_processor.resource_type
+ resources = heat.get_resource_by_type(resource_type)
+ heat_parameters = heat.parameters
+ for rid, resource in resources.items():
+ resource_intext, port_match = resource_processor.get_rid_match_tuple(rid)
+ if not port_match:
+ continue # port resource ID not formatted correctely
+
+ if (
+ resource_intext != intext
+ ): # skipping if type (internal/external) doesn't match
continue
- port_resource = None
+ for param in prop_iterator(resource, *properties):
+ if (
+ param
+ and isinstance(param, dict)
+ and "get_resource" not in param
+ and "get_attr" not in param
+ ):
+ # checking parameter uses get_param
+ parameter = param.get("get_param")
+ if not parameter:
+ msg = (
+ "Unexpected parameter format for {} {} property {}: {}. "
+ "Please consult the heat guidelines documentation for details."
+ ).format(resource_type, rid, properties, param)
+ invalid_parameters.append(msg) # should this be a failure?
+ continue
- vm_type = get_vm_type_for_nova_server(v)
- if not vm_type:
- continue
+ # getting parameter if the get_param uses list, and getting official
+ # HEAT parameter type
+ parameter_type = parameter_type_to_heat_type(parameter)
+ if parameter_type == "comma_delimited_list":
+ parameter = parameter[0]
+ elif parameter_type != "string":
+ continue
- # get all ports associated with the nova server
- properties = v['properties']
- for network in properties['networks']:
- for k3, v3 in network.items():
- if k3 != 'port':
+ # checking parameter format = parameter type defined in parameters
+ # section
+ heat_parameter_type = nested_dict.get(
+ heat_parameters, parameter, "type"
+ )
+ if not heat_parameter_type or heat_parameter_type != parameter_type:
+ msg = (
+ "{} {} parameter {} defined as type {} "
+ + "is being used as type {} in the heat template"
+ ).format(
+ resource_type,
+ properties,
+ parameter,
+ heat_parameter_type,
+ parameter_type,
+ )
+ invalid_parameters.append(msg) # should this actually be an error?
continue
- if not isinstance(v3, dict):
+
+ if exemptions_allowed and parameter in get_aap_exemptions(resource):
continue
- if 'get_resource' in v3:
- port_id = v3['get_resource']
- if not resources[port_id]:
- continue
- port_resource = resources[port_id]
- else:
+ # if parameter type is not in regx dict, then it is not supported
+ # by automation
+ regx_dict = regx[resource_intext].get(parameter_type)
+ if not regx_dict:
+ msg = (
+ "{} {} {} parameter {} defined as type {} "
+ "which is required by platform data model for proper "
+ "assignment and inventory."
+ ).format(resource_type, rid, properties, parameter, parameter_type)
+ if exemptions_allowed:
+ msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT)
+ invalid_parameters.append(msg)
continue
- network_role = get_network_role_from_port(port_resource)
- if not network_role:
+ # checking if param adheres to guidelines format
+ regexp = regx[resource_intext][parameter_type]["machine"]
+ readable_format = regx[resource_intext][parameter_type]["readable"]
+ match = regexp.match(parameter)
+ if not match:
+ msg = (
+ "{} {} property {} parameter {} does not follow {} "
+ "format {} which is required by platform data model for proper "
+ "assignment and inventory."
+ ).format(
+ resource_type,
+ rid,
+ properties,
+ parameter,
+ resource_intext,
+ readable_format,
+ )
+ if exemptions_allowed:
+ msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT)
+ invalid_parameters.append(msg)
continue
- for k1, v1 in port_resource["properties"].items():
- if k1 != port_property:
+ # checking that parameter includes correct vm_type/network_role
+ parameter_checks = regx.get("parameter_to_resource_comparisons", [])
+ for check in parameter_checks:
+ resource_match = port_match.group(check)
+ if (
+ resource_match
+ and not parameter.startswith(resource_match)
+ and parameter.find("_{}_".format(resource_match)) == -1
+ ):
+ msg = (
+ "{0} {1} property {2} parameter "
+ "{3} {4} does match resource {4} {5}"
+ ).format(
+ resource_type,
+ rid,
+ properties,
+ parameter,
+ check,
+ resource_match,
+ )
+ invalid_parameters.append(msg)
continue
- for v2 in v1:
- if "ip_address" not in v2:
- continue
- if "get_param" not in v2["ip_address"]:
- continue
-
- ip_address = v2["ip_address"]["get_param"]
-
- if isinstance(ip_address, list):
- ip_address = ip_address[0]
-
- valid_ip_address = is_valid_ip_address(ip_address,
- vm_type,
- network_role,
- port_property)
-
- if not valid_ip_address:
- invalid_ip_addresses.append(ip_address)
-
- return invalid_ip_addresses
-
-
-def is_reserved_port(port_id):
- '''
- Checks to see if the resource id for a port follows
- the reserve port concept
- '''
- formats = [
- ["port_id",
- re.compile(r'reserve_port_(.+?)_floating_ip_\d+')],
- ["port_id",
- re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')],
- ]
- for f in formats:
- m = f[1].match(port_id.lower())
- if m and m.group(1):
- return True
- return False
+
+ assert not invalid_parameters, "%s" % "\n".join(invalid_parameters)
+
+
+def get_list_of_ports_attached_to_nova_server(nova_server):
+ networks_list = nova_server.get("properties", {}).get("networks")
+
+ port_ids = []
+ if networks_list:
+ for network in networks_list:
+ network_prop = network.get("port")
+ if network_prop:
+ pid = network_prop.get("get_param")
+ if not pid:
+ pid = network_prop.get("get_resource")
+ port_ids.append(pid)
+
+ return port_ids