X-Git-Url: https://gerrit.onap.org/r/gitweb?p=vvp%2Fvalidation-scripts.git;a=blobdiff_plain;f=ice_validator%2Ftests%2Fhelpers.py;h=291464def3d19971787327166ea77c24fd9f967c;hp=9cde1faf11ff4b149b2dec2e4a9aca3829c88a9c;hb=f5190cf61981eff1eb59157d4d2f8bd06acb3570;hpb=cc21b8b08b6dbcec577bfb26ff397ac899da8002 diff --git a/ice_validator/tests/helpers.py b/ice_validator/tests/helpers.py index 9cde1fa..291464d 100644 --- a/ice_validator/tests/helpers.py +++ b/ice_validator/tests/helpers.py @@ -1,12 +1,12 @@ # -*- coding: utf8 -*- -# ============LICENSE_START======================================================= +# ============LICENSE_START==================================================== # org.onap.vvp/validation-scripts # =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# Copyright © 2019 AT&T Intellectual Property. All rights reserved. # =================================================================== # # Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); +# under the Apache License, Version 2.0 (the "License"); # you may not use this software except in compliance with the License. # You may obtain a copy of the License at # @@ -21,7 +21,7 @@ # # # Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); +# under the Creative Commons License, Attribution 4.0 Intl. (the "License"); # you may not use this documentation except in compliance with the License. # You may obtain a copy of the License at # @@ -35,44 +35,388 @@ # # ============LICENSE_END============================================ # -# ECOMP is a trademark and service mark of AT&T Intellectual Property. # -import yaml +"""Helpers +""" + +import os +import re +import zipfile +from collections import defaultdict +from typing import Set + +from boltons import funcutils +from tests import cached_yaml as yaml + +__path__ = [os.path.dirname(os.path.abspath(__file__))] +DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0]) +RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)") + + +def is_base_module(template_path): + basename = os.path.basename(template_path).lower() + name, extension = os.path.splitext(basename) + is_yaml = extension in {".yml", ".yaml"} + return is_yaml and RE_BASE.search(name) and not name.endswith("_volume") def check_basename_ending(template_type, basename): - ''' + """ return True/False if the template type is matching the filename - ''' + """ if not template_type: return True - elif template_type == 'volume': - return basename.endswith('_volume') + elif template_type == "volume": + return basename.endswith("_volume") else: - return not basename.endswith('_volume') + return not basename.endswith("_volume") -def get_parsed_yml_for_yaml_files(yaml_files, sections=[]): - ''' +def get_parsed_yml_for_yaml_files(yaml_files, sections=None): + """ get the parsed yaml for a list of yaml files - ''' + """ + sections = [] if sections is None else sections parsed_yml_list = [] for yaml_file in yaml_files: - yml = '' - try: with open(yaml_file) as fh: yml = yaml.load(fh) - except Exception as e: - print(e) - + except yaml.YAMLError as e: + # pylint: disable=superfluous-parens + print("Error in %s: %s" % (yaml_file, e)) + continue if yml: if sections: for k in yml.keys(): if k not in sections: del yml[k] parsed_yml_list.append(yml) - return parsed_yml_list + + +def validates(*requirement_ids): + """Decorator that tags the test function with one or more requirement IDs. + + Example: + >>> @validates('R-12345', 'R-12346') + ... def test_something(): + ... pass + >>> assert test_something.requirement_ids == ['R-12345', 'R-12346'] + """ + # pylint: disable=missing-docstring + def decorator(func): + # NOTE: We use a utility here to ensure that function signatures are + # maintained because pytest inspects function signatures to inject + # fixtures. I experimented with a few options, but this is the only + # library that worked. Other libraries dynamically generated a + # function at run-time, and then lost the requirement_ids attribute + @funcutils.wraps(func) + def wrapper(*args, **kw): + return func(*args, **kw) + + wrapper.requirement_ids = requirement_ids + return wrapper + + decorator.requirement_ids = requirement_ids + return decorator + + +def categories(*categories): + def decorator(func): + @funcutils.wraps(func) + def wrapper(*args, **kw): + return func(*args, **kw) + + wrapper.categories = categories + return wrapper + + decorator.categories = categories + return decorator + + +def get_environment_pair(heat_template): + """Returns a yaml/env pair given a yaml file""" + base_dir, filename = os.path.split(heat_template) + basename = os.path.splitext(filename)[0] + env_template = os.path.join(base_dir, "{}.env".format(basename)) + if os.path.exists(env_template): + with open(heat_template, "r") as fh: + yyml = yaml.load(fh) + with open(env_template, "r") as fh: + eyml = yaml.load(fh) + + environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml} + return environment_pair + + return None + + +def find_environment_file(yaml_files): + """ + Pass file and recursively step backwards until environment file is found + + :param yaml_files: list or string, start at size 1 and grows recursively + :return: corresponding environment file for a file, or None + """ + # sanitize + if isinstance(yaml_files, str): + yaml_files = [yaml_files] + + yaml_file = yaml_files[-1] + filepath, filename = os.path.split(yaml_file) + + environment_pair = get_environment_pair(yaml_file) + if environment_pair: + return environment_pair + + for file in os.listdir(filepath): + fq_name = "{}/{}".format(filepath, file) + if fq_name.endswith("yaml") or fq_name.endswith("yml"): + if fq_name not in yaml_files: + with open(fq_name) as f: + yml = yaml.load(f) + resources = yml.get("resources", {}) + for resource_id, resource in resources.items(): + resource_type = resource.get("type", "") + if resource_type == "OS::Heat::ResourceGroup": + resource_type = ( + resource.get("properties", {}) + .get("resource_def", {}) + .get("type", "") + ) + # found called nested file + if resource_type == filename: + yaml_files.append(fq_name) + environment_pair = find_environment_file(yaml_files) + + return environment_pair + + +def load_yaml(yaml_file): + """ + Load the YAML file at the given path. If the file has previously been + loaded, then a cached version will be returned. + + :param yaml_file: path to the YAML file + :return: data structure loaded from the YAML file + """ + with open(yaml_file) as fh: + return yaml.load(fh) + + +def traverse(data, search_key, func, path=None): + """ + Traverse the data structure provided via ``data`` looking for occurences + of ``search_key``. When ``search_key`` is found, the value associated + with that key is passed to ``func`` + + :param data: arbitrary data structure of dicts and lists + :param search_key: key field to search for + :param func: Callable object that takes two parameters: + * A list representing the path of keys to search_key + * The value associated with the search_key + """ + path = [] if path is None else path + if isinstance(data, dict): + for key, value in data.items(): + curr_path = path + [key] + if key == search_key: + func(curr_path, value) + traverse(value, search_key, func, curr_path) + elif isinstance(data, list): + for value in data: + curr_path = path + [value] + if isinstance(value, (dict, list)): + traverse(value, search_key, func, curr_path) + elif value == search_key: + func(curr_path, value) + + +def check_indices(pattern, values, value_type): + """ + Checks that indices associated with the matched prefix start at 0 and + increment by 1. It returns a list of messages for any prefixes that + violate the rules. + + :param pattern: Compiled regex that whose first group matches the prefix and + second group matches the index + :param values: sequence of string names that may or may not match the pattern + :param name: Type of value being checked (ex: IP Parameters). This will + be included in the error messages. + :return: List of error messages, empty list if no violations found + """ + if not hasattr(pattern, "match"): + raise RuntimeError("Pattern must be a compiled regex") + + prefix_indices = defaultdict(set) + for value in values: + m = pattern.match(value) + if m: + prefix_indices[m.group(1)].add(int(m.group(2))) + + invalid_params = [] + for prefix, indices in prefix_indices.items(): + indices = sorted(indices) + if indices[0] != 0: + invalid_params.append( + "{} with prefix {} do not start at 0".format(value_type, prefix) + ) + elif len(indices) - 1 != indices[-1]: + invalid_params.append( + ( + "Index values of {} with prefix {} do not " + "increment by 1: {}" + ).format(value_type, prefix, indices) + ) + return invalid_params + + +def get_base_template_from_yaml_files(yaml_files): + """Return first filepath to match RE_BASE + """ + for filepath in yaml_files: + basename = get_base_template_from_yaml_file(filepath) + if basename: + return basename + return None + + +def get_base_template_from_yaml_file(yaml_file): + (dirname, filename) = os.path.split(yaml_file) + files = os.listdir(dirname) + for file in files: + basename, __ = os.path.splitext(os.path.basename(file)) + if ( + (__ == ".yaml" or __ == ".yml") + and RE_BASE.search(basename) + and basename.find("volume") == -1 + ): + return os.path.join(dirname, "{}{}".format(basename, __)) + return None + + +def parameter_type_to_heat_type(parameter): + # getting parameter format + if isinstance(parameter, list): + parameter_type = "comma_delimited_list" + elif isinstance(parameter, str): + parameter_type = "string" + elif isinstance(parameter, dict): + parameter_type = "json" + elif isinstance(parameter, int) or isinstance(parameter, float): + parameter_type = "number" + elif isinstance(parameter, bool): + parameter_type = "boolean" + else: + parameter_type = None + + return parameter_type + + +def prop_iterator(resource, *props): + terminators = ["get_resource", "get_attr", "str_replace", "get_param"] + if "properties" in resource: + resource = resource.get("properties") + props = list(props) + + if isinstance(resource, dict) and any(x for x in terminators if x in resource): + yield resource + else: + prop = resource.get(props.pop(0)) + if isinstance(prop, list): + for x in prop: + yield from prop_iterator(x, *props) + elif isinstance(prop, dict): + yield from prop_iterator(prop, *props) + + +def get_param(property_value): + """ + Returns the first parameter name from a get_param or None if get_param is + not used + """ + if property_value and isinstance(property_value, dict): + param = property_value.get("get_param") + if param and isinstance(param, list) and len(param) > 0: + return param[0] + else: + return param + return None + + +def get_output_dir(config): + """ + Retrieve the output directory for the reports and create it if necessary + :param config: pytest configuration + :return: output directory as string + """ + output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR + if not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + return output_dir + + +def first(seq, predicate, default=None): + """ + Return the first item in sequence that satisfies the callable, predicate, or + returns the default if not found. + + :param seq: iterable sequence of objects + :param predicate: callable that accepts one item from the sequence + :param default: value to return if not found (default is None) + :return: default value if no item satisfies the predicate + """ + return next((i for i in seq if predicate(i)), default) + + +def check(predicate, message): + """ + Raise a RuntimeError with the provided message if predicate is False. + + Example: + check(path.is_file(), "{} must be a file".format(path.as_posix())) + + :param predicate: boolean condition + :param message: message + """ + if not predicate: + raise RuntimeError(message) + + +def unzip(zip_path, target_dir): + """ + Extracts a Zip archive located at zip_path to target_dir (which will be + created if it already exists) + + :param zip_path: path to valid zip file + :param target_dir: directory to unzip zip_path + """ + check(zipfile.is_zipfile(zip_path), "{} is not a valid zipfile or does not exist".format(zip_path)) + archive = zipfile.ZipFile(zip_path) + if not os.path.exists(target_dir): + os.makedirs(target_dir, exist_ok=True) + archive.extractall(path=target_dir) + + +def remove(sequence, exclude, key=None): + """ + Remove a copy of sequence that items occur in exclude. + + :param sequence: sequence of objects + :param exclude: objects to excluded (must support ``in`` check) + :param key: optional function to extract key from item in sequence + :return: list of items not in the excluded + """ + key_func = key if key else lambda x: x + result = (s for s in sequence if key_func(s) not in exclude) + return set(result) if isinstance(sequence, Set) else list(result) + + +def is_nova_server(resource): + """ + checks resource is a nova server + """ + return isinstance(resource, dict) and "type" in resource and "properties" in resource and resource.get("type") == "OS::Nova::Server" +