X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=ice_validator%2Ftests%2Futils%2Fports.py;h=8c25df77bbb68b53b234e1d3e5580233e290f152;hb=5ff7ed0cf3ac9e8110579ee4f0f711e30fb2511e;hp=4d0b4ca1d959dbd0ba114d99b2225ac539ab4bdb;hpb=655f39713cca2595a812ccd60cc738301aef8b2f;p=vvp%2Fvalidation-scripts.git diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py index 4d0b4ca..8c25df7 100644 --- a/ice_validator/tests/utils/ports.py +++ b/ice_validator/tests/utils/ports.py @@ -2,11 +2,11 @@ # ============LICENSE_START======================================================= # org.onap.vvp/validation-scripts # =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# Copyright © 2019 AT&T Intellectual Property. All rights reserved. # =================================================================== # # Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); +# under the Apache License, Version 2.0 (the "License"); # you may not use this software except in compliance with the License. # You may obtain a copy of the License at # @@ -21,7 +21,7 @@ # # # Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# under the Creative Commons License, Attribution 4.0 Intl. (the "License"); # you may not use this documentation except in compliance with the License. # You may obtain a copy of the License at # @@ -35,163 +35,180 @@ # # ============LICENSE_END============================================ # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. # - -from .network_roles import get_network_role_from_port -from .vm_types import get_vm_type_for_nova_server -import re - - -def is_valid_ip_address(ip_address, vm_type, network_role, port_property): - ''' - Check the ip_address to make sure it is properly formatted and - also contains {vm_type} and {network_role} - ''' - - allowed_formats = [ - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_floating_v6_ip')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_floating_ip')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_floating_v6_ip')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_floating_ip')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], - ["allowed_address_pairs", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_ip_\d+')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_v6_ip_\d+')], - ["allowed_address_pairs", "string", "external", - re.compile(r'(.+?)_ip_\d+')], - ["allowed_address_pairs", "comma_delimited_list", - "internal", re.compile(r'(.+?)_int_(.+?)_v6_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "internal", re.compile(r'(.+?)_int_(.+?)_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "external", re.compile(r'(.+?)_v6_ips')], - ["allowed_address_pairs", "comma_delimited_list", - "external", re.compile(r'(.+?)_ips')], - ["fixed_ips", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ip_\d+')], - ["fixed_ips", "string", "internal", - re.compile(r'(.+?)_int_(.+?)_ip_\d+')], - ["fixed_ips", "string", "external", - re.compile(r'(.+?)_v6_ip_\d+')], - ["fixed_ips", "string", "external", - re.compile(r'(.+?)_ip_\d+')], - ["fixed_ips", "comma_delimited_list", "internal", - re.compile(r'(.+?)_int_(.+?)_v6_ips')], - ["fixed_ips", "comma_delimited_list", "internal", - re.compile(r'(.+?)_int_(.+?)_ips')], - ["fixed_ips", "comma_delimited_list", "external", - re.compile(r'(.+?)_v6_ips')], - ["fixed_ips", "comma_delimited_list", "external", - re.compile(r'(.+?)_ips')], - ] - - for v3 in allowed_formats: - if v3[0] != port_property: - continue - # check if pattern matches - m = v3[3].match(ip_address) - if m: - if (v3[2] == "internal" and - len(m.groups()) > 1): - return m.group(1) == vm_type and\ - m.group(2) == network_role - elif (v3[2] == "external" and - len(m.groups()) > 0): - return m.group(1) == vm_type + "_" + network_role - - return False - - -def get_invalid_ip_addresses(resources, port_property): - ''' - Get a list of valid ip addresses for a heat resources section - ''' - invalid_ip_addresses = [] - - for k, v in resources.items(): - if not isinstance(v, dict): - continue - if 'type' not in v: - continue - if v['type'] not in 'OS::Nova::Server': - continue - if 'properties' not in v: - continue - if 'networks' not in v['properties']: +from tests.structures import Heat +from tests.helpers import parameter_type_to_heat_type, prop_iterator +from . import nested_dict + + +AAP_EXEMPT_CAVEAT = ( + "If this VNF is not able to adhere to this requirement, please consult the Heat " + "Orchestration Template guidelines for more information. If you are knowingly " + "violating this requirement after reading the guidelines, then add the parameter " + "to the aap_exempt list under this resources metadata to suppress this warning." +) + + +def get_aap_exemptions(resource_props): + """ + Gets the list of parameters that the Heat author has exempted from following + the naming conventions associated with AAP. + + :param resource_props: dict of properties under the resource ID + :return: list of all parameters to exempt or an empty list + """ + metadata = resource_props.get("metadata") or {} + return metadata.get("aap_exempt") or [] + + +def check_parameter_format( + yaml_file, regx, intext, resource_processor, *properties, exemptions_allowed=False +): + """ + yaml_file: input file to check + regx: dictionary containing the regex to use to validate parameter + intext: internal or external + resource_processor: resource type specific helper, defined in structures.py + properties: arg list of property that is being checked + exemptions_allowed: If True, then parameters in the aap_exempt list are allowed to + not follow the rules + """ + + invalid_parameters = [] + heat = Heat(filepath=yaml_file) + resource_type = resource_processor.resource_type + resources = heat.get_resource_by_type(resource_type) + heat_parameters = heat.parameters + for rid, resource in resources.items(): + resource_intext, port_match = resource_processor.get_rid_match_tuple(rid) + if not port_match: + continue # port resource ID not formatted correctely + + if ( + resource_intext != intext + ): # skipping if type (internal/external) doesn't match continue - port_resource = None + for param in prop_iterator(resource, *properties): + if ( + param + and isinstance(param, dict) + and "get_resource" not in param + and "get_attr" not in param + ): + # checking parameter uses get_param + parameter = param.get("get_param") + if not parameter: + msg = ( + "Unexpected parameter format for {} {} property {}: {}. " + "Please consult the heat guidelines documentation for details." + ).format(resource_type, rid, properties, param) + invalid_parameters.append(msg) # should this be a failure? + continue - vm_type = get_vm_type_for_nova_server(v) - if not vm_type: - continue + # getting parameter if the get_param uses list, and getting official + # HEAT parameter type + parameter_type = parameter_type_to_heat_type(parameter) + if parameter_type == "comma_delimited_list": + parameter = parameter[0] + elif parameter_type != "string": + continue - # get all ports associated with the nova server - properties = v['properties'] - for network in properties['networks']: - for k3, v3 in network.items(): - if k3 != 'port': + # checking parameter format = parameter type defined in parameters + # section + heat_parameter_type = nested_dict.get( + heat_parameters, parameter, "type" + ) + if not heat_parameter_type or heat_parameter_type != parameter_type: + msg = ( + "{} {} parameter {} defined as type {} " + + "is being used as type {} in the heat template" + ).format( + resource_type, + properties, + parameter, + heat_parameter_type, + parameter_type, + ) + invalid_parameters.append(msg) # should this actually be an error? continue - if not isinstance(v3, dict): + + if exemptions_allowed and parameter in get_aap_exemptions(resource): continue - if 'get_resource' in v3: - port_id = v3['get_resource'] - if not resources[port_id]: - continue - port_resource = resources[port_id] - else: + # if parameter type is not in regx dict, then it is not supported + # by automation + regx_dict = regx[resource_intext].get(parameter_type) + if not regx_dict: + msg = ( + "{} {} parameter {} defined as type {} " + "which is required by platform data model for proper " + "assignment and inventory." + ).format(resource_type, properties, parameter, parameter_type) + if exemptions_allowed: + msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT) + invalid_parameters.append(msg) continue - network_role = get_network_role_from_port(port_resource) - if not network_role: + # checking if param adheres to guidelines format + regexp = regx[resource_intext][parameter_type]["machine"] + readable_format = regx[resource_intext][parameter_type]["readable"] + match = regexp.match(parameter) + if not match: + msg = ( + "{} {} property {} parameter {} does not follow {} " + "format {} which is required by platform data model for proper " + "assignment and inventory." + ).format( + resource_type, + rid, + properties, + parameter, + resource_intext, + readable_format, + ) + if exemptions_allowed: + msg = "WARNING: {} {}".format(msg, AAP_EXEMPT_CAVEAT) + invalid_parameters.append(msg) continue - for k1, v1 in port_resource["properties"].items(): - if k1 != port_property: + # checking that parameter includes correct vm_type/network_role + parameter_checks = regx.get("parameter_to_resource_comparisons", []) + for check in parameter_checks: + resource_match = port_match.group(check) + if ( + resource_match + and not parameter.startswith(resource_match) + and parameter.find("_{}_".format(resource_match)) == -1 + ): + msg = ( + "{0} {1} property {2} parameter " + "{3} {4} does match resource {4} {5}" + ).format( + resource_type, + rid, + properties, + parameter, + check, + resource_match, + ) + invalid_parameters.append(msg) continue - for v2 in v1: - if "ip_address" not in v2: - continue - if "get_param" not in v2["ip_address"]: - continue - - ip_address = v2["ip_address"]["get_param"] - - if isinstance(ip_address, list): - ip_address = ip_address[0] - - valid_ip_address = is_valid_ip_address(ip_address, - vm_type, - network_role, - port_property) - - if not valid_ip_address: - invalid_ip_addresses.append(ip_address) - - return invalid_ip_addresses - - -def is_reserved_port(port_id): - ''' - Checks to see if the resource id for a port follows - the reserve port concept - ''' - formats = [ - ["port_id", - re.compile(r'reserve_port_(.+?)_floating_ip_\d+')], - ["port_id", - re.compile(r'reserve_port_(.+?)_floating_v6_ip_\d+')], - ] - for f in formats: - m = f[1].match(port_id.lower()) - if m and m.group(1): - return True - return False + + assert not invalid_parameters, "%s" % "\n".join(invalid_parameters) + + +def get_list_of_ports_attached_to_nova_server(nova_server): + networks_list = nova_server.get("properties", {}).get("networks") + + port_ids = [] + if networks_list: + for network in networks_list: + network_prop = network.get("port") + if network_prop: + pid = network_prop.get("get_param") + if not pid: + pid = network_prop.get("get_resource") + port_ids.append(pid) + + return port_ids