X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=ice_validator%2Ftests%2Futils%2Fports.py;h=c005e32022dff959b10f32698d9dd1dd84bf031e;hb=f257796cdb575d5079dce9738e31808c089f4cf3;hp=f5db5a42d07ce2cea38e86d2ff638440caf6ec39;hpb=8540eb59f7e7f8ff2043a8eaf7edfc255a60874a;p=vvp%2Fvalidation-scripts.git diff --git a/ice_validator/tests/utils/ports.py b/ice_validator/tests/utils/ports.py index f5db5a4..c005e32 100644 --- a/ice_validator/tests/utils/ports.py +++ b/ice_validator/tests/utils/ports.py @@ -36,196 +36,120 @@ # ============LICENSE_END============================================ # # +from .network_roles import get_network_role_and_type +from tests.structures import Heat, NeutronPortProcessor +from tests.helpers import parameter_type_to_heat_type +from . import nested_dict -from .network_roles import get_network_role_from_port -from .vm_types import get_vm_type_for_nova_server -import re - -def is_valid_ip_address( - ip_address, vm_type, network_role, port_property, parameter_type -): +def check_ip_format(yaml_file, regx, port_type, resource_property, nested_property): """ - Check the ip_address to make sure it is properly formatted and - also contains {vm_type} and {network_role} + yaml_file: input file to check + regx: dictionary containing the regex to use to validate parameter + port_type: internal or external + resource_property: OS::Neutron::Port property to check for parameter + nested_property: resource_property will be a list of dicts, this is the key to index into """ - - allowed_formats = [ - [ - "allowed_address_pairs", - "string", - "internal", - re.compile(r"(.+?)_int_(.+?)_floating_v6_ip"), - ], - [ - "allowed_address_pairs", - "string", - "internal", - re.compile(r"(.+?)_int_(.+?)_floating_ip"), - ], - [ - "allowed_address_pairs", - "string", - "external", - re.compile(r"(.+?)_floating_v6_ip"), - ], - [ - "allowed_address_pairs", - "string", - "external", - re.compile(r"(.+?)_floating_ip"), - ], - [ - "allowed_address_pairs", - "string", - "internal", - re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+"), - ], - [ - "allowed_address_pairs", - "string", - "internal", - re.compile(r"(.+?)_int_(.+?)_ip_\d+"), - ], - ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")], - ["allowed_address_pairs", "string", "external", re.compile(r"(.+?)_ip_\d+")], - [ - "allowed_address_pairs", - "comma_delimited_list", - "internal", - re.compile(r"(.+?)_int_(.+?)_v6_ips"), - ], - [ - "allowed_address_pairs", - "comma_delimited_list", - "internal", - re.compile(r"(.+?)_int_(.+?)_ips"), - ], - [ - "allowed_address_pairs", - "comma_delimited_list", - "external", - re.compile(r"(.+?)_v6_ips"), - ], - [ - "allowed_address_pairs", - "comma_delimited_list", - "external", - re.compile(r"(.+?)_ips"), - ], - ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_v6_ip_\d+")], - ["fixed_ips", "string", "internal", re.compile(r"(.+?)_int_(.+?)_ip_\d+")], - ["fixed_ips", "string", "external", re.compile(r"(.+?)_v6_ip_\d+")], - ["fixed_ips", "string", "external", re.compile(r"(.+?)_ip_\d+")], - [ - "fixed_ips", - "comma_delimited_list", - "internal", - re.compile(r"(.+?)_int_(.+?)_v6_ips"), - ], - [ - "fixed_ips", - "comma_delimited_list", - "internal", - re.compile(r"(.+?)_int_(.+?)_ips"), - ], - ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_v6_ips")], - ["fixed_ips", "comma_delimited_list", "external", re.compile(r"(.+?)_ips")], - ] - - for v3 in allowed_formats: - if v3[1] != parameter_type: - continue - if v3[0] != port_property: - continue - # check if pattern matches - m = v3[3].match(ip_address) - if m: - if v3[2] == "internal" and len(m.groups()) > 1: - return m.group(1) == vm_type and m.group(2) == network_role - elif v3[2] == "external" and len(m.groups()) > 0: - return m.group(1) == vm_type + "_" + network_role - - return False - - -def get_invalid_ip_addresses(resources, port_property, parameters): - """ - Get a list of valid ip addresses for a heat resources section - """ - invalid_ip_addresses = [] - - for k, v in resources.items(): - if not isinstance(v, dict): - continue - if "type" not in v: + invalid_ips = [] + heat = Heat(filepath=yaml_file) + ports = heat.get_resource_by_type("OS::Neutron::Port") + heat_parameters = heat.parameters + + for rid, resource in ports.items(): + network_role, network_type = get_network_role_and_type(resource) + if ( + network_type != port_type + ): # skipping if port type (internal/external) doesn't match continue - if v["type"] not in "OS::Nova::Server": - continue - if "properties" not in v: - continue - if "networks" not in v["properties"]: - continue - - port_resource = None - - vm_type = get_vm_type_for_nova_server(v) - if not vm_type: - continue - - # get all ports associated with the nova server - properties = v["properties"] - for network in properties["networks"]: - for k3, v3 in network.items(): - if k3 != "port": - continue - if not isinstance(v3, dict): - continue - - if "get_resource" in v3: - port_id = v3["get_resource"] - if not resources[port_id]: - continue - port_resource = resources[port_id] - else: - continue - network_role = get_network_role_from_port(port_resource) - if not network_role: + name, port_match = NeutronPortProcessor.get_rid_match_tuple(rid) + if not port_match: + continue # port resource ID not formatted correctely + + params = nested_dict.get(resource, "properties", resource_property, default={}) + + for param in params: + prop = nested_dict.get(param, nested_property) + if ( + not prop + or not isinstance(prop, dict) + or "get_resource" in prop + or "get_attr" in prop + # or "str_replace" in prop - should str_replace be checked? + ): + continue # lets only check parameters shall we? + + # checking parameter uses get_param + parameter = nested_dict.get(prop, "get_param") + if not parameter: + msg = ( + "Unexpected parameter format for OS::Neutron::Port {} property {}: {}. " + + "Please consult the heat guidelines documentation for details." + ).format(rid, resource_property, prop) + invalid_ips.append(msg) # should this be a failure? + continue + + # getting parameter if the get_param uses list, and getting official HEAT parameter type + parameter_type = parameter_type_to_heat_type(parameter) + if parameter_type == "comma_delimited_list": + parameter = parameter[0] + elif parameter_type != "string": + continue + + # checking parameter format = type defined in template + heat_parameter_type = nested_dict.get(heat_parameters, parameter, "type") + if not heat_parameter_type or heat_parameter_type != parameter_type: + msg = ( + "OS::Neutron::Port {} parameter {} defined as type {} " + + "is being used as type {} in the heat template" + ).format( + resource_property, parameter, heat_parameter_type, parameter_type + ) + invalid_ips.append(msg) + continue + + # if parameter type is not in regx dict, then it is not supported by automation + regx_dict = regx[port_type].get(parameter_type) + if not regx_dict: + msg = ( + "WARNING: OS::Neutron::Port {} parameter {} defined as type {} " + + "is not supported by platform automation. If this VNF is not able " + + "to adhere to this requirement, please consult the Heat Orchestration " + + "Template guidelines for alternative solutions. If already adhering to " + + "an alternative provided by the heat guidelines, please disregard this " + + "message." + ).format(resource_property, parameter, parameter_type) + invalid_ips.append(msg) + continue + + # checking if param adheres to guidelines format + regexp = regx[port_type][parameter_type]["machine"] + readable_format = regx[port_type][parameter_type]["readable"] + match = regexp.match(parameter) + if not match: + msg = "{} parameter {} does not follow format {}".format( + resource_property, parameter, readable_format + ) + invalid_ips.append(msg) + continue + + # checking that parameter includes correct vm_type/network_role + parameter_checks = regx.get("parameter_to_resource_comparisons", []) + for check in parameter_checks: + resource_match = port_match.group(check) + if ( + resource_match + and not parameter.startswith(resource_match) + and parameter.find("_{}_".format(resource_match)) == -1 + ): + msg = ( + "OS::Neutron::Port {0} property {1} parameter " + + "{2} {3} does match resource {3} {4}" + ).format(rid, resource_property, parameter, check, resource_match) + invalid_ips.append(msg) continue - for k1, v1 in port_resource["properties"].items(): - if k1 != port_property: - continue - for v2 in v1: - if "ip_address" not in v2: - continue - if "get_param" not in v2["ip_address"]: - continue - ip_address = v2["ip_address"]["get_param"] - - if isinstance(ip_address, list): - ip_address = ip_address[0] - - if ip_address not in parameters: - continue - - parameter_type = parameters[ip_address].get("type") - if not parameter_type: - continue - - valid_ip_address = is_valid_ip_address( - ip_address, - vm_type, - network_role, - port_property, - parameter_type, - ) - - if not valid_ip_address: - invalid_ip_addresses.append(ip_address) - - return invalid_ip_addresses + assert not invalid_ips, "%s" % "\n".join(invalid_ips) def get_list_of_ports_attached_to_nova_server(nova_server):