X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=ice_validator%2Ftests%2Fhelpers.py;h=f4a368ce86f991050a16637ffb99338b41917a00;hb=f8590584ebf86014951c8bf19b9d7c14f9a10e81;hp=fd78c7581fc9227f9e9ef4a6569f1c0c6cfa1303;hpb=9ccbfe7e20b3914c37c0dec50762cec9da8a63c6;p=vvp%2Fvalidation-scripts.git diff --git a/ice_validator/tests/helpers.py b/ice_validator/tests/helpers.py index fd78c75..f4a368c 100644 --- a/ice_validator/tests/helpers.py +++ b/ice_validator/tests/helpers.py @@ -42,12 +42,40 @@ import os import re +import zipfile from collections import defaultdict +from typing import Set from boltons import funcutils from tests import cached_yaml as yaml -VERSION = "1.1.0" +__path__ = [os.path.dirname(os.path.abspath(__file__))] +DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0]) +RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)") + +INTRINSIC_FUNCTIONS = [ + "get_resource", + "get_attr", + "str_replace", + "get_param", + "list_join", + "get_file", + "resource_facade", + "Fn::Select", + "repeat", + "digest", + "str_split", + "yaql", + "map_replace", + "map_merge", +] + + +def is_base_module(template_path): + basename = os.path.basename(template_path).lower() + name, extension = os.path.splitext(basename) + is_yaml = extension in {".yml", ".yaml"} + return is_yaml and RE_BASE.search(name) and not name.endswith("_volume") def check_basename_ending(template_type, basename): @@ -113,16 +141,21 @@ def validates(*requirement_ids): return decorator -def categories(*categories): +def categories(*all_of, any_of=None): + any_of = set(any_of) if any_of else set() + all_of = set(all_of) if all_of else set() + def decorator(func): @funcutils.wraps(func) def wrapper(*args, **kw): return func(*args, **kw) - wrapper.categories = categories + wrapper.all_categories = all_of + wrapper.any_categories = any_of return wrapper - decorator.categories = categories + decorator.all_categories = all_of + decorator.any_categories = any_of return decorator @@ -218,7 +251,7 @@ def traverse(data, search_key, func, path=None): elif isinstance(data, list): for value in data: curr_path = path + [value] - if isinstance(value, dict): + if isinstance(value, (dict, list)): traverse(value, search_key, func, curr_path) elif value == search_key: func(curr_path, value) @@ -262,9 +295,6 @@ def check_indices(pattern, values, value_type): return invalid_params -RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)") - - def get_base_template_from_yaml_files(yaml_files): """Return first filepath to match RE_BASE """ @@ -297,9 +327,7 @@ def parameter_type_to_heat_type(parameter): parameter_type = "string" elif isinstance(parameter, dict): parameter_type = "json" - elif isinstance(parameter, int): - parameter_type = "number" - elif isinstance(parameter, float): + elif isinstance(parameter, int) or isinstance(parameter, float): parameter_type = "number" elif isinstance(parameter, bool): parameter_type = "boolean" @@ -310,12 +338,13 @@ def parameter_type_to_heat_type(parameter): def prop_iterator(resource, *props): - terminators = ["get_resource", "get_attr", "str_replace", "get_param"] if "properties" in resource: resource = resource.get("properties") props = list(props) - if isinstance(resource, dict) and any(x for x in terminators if x in resource): + if isinstance(resource, dict) and any( + x for x in INTRINSIC_FUNCTIONS if x in resource + ): yield resource else: prop = resource.get(props.pop(0)) @@ -338,3 +367,86 @@ def get_param(property_value): else: return param return None + + +def get_output_dir(config): + """ + Retrieve the output directory for the reports and create it if necessary + :param config: pytest configuration + :return: output directory as string + """ + output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR + if not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + return output_dir + + +def first(seq, predicate, default=None): + """ + Return the first item in sequence that satisfies the callable, predicate, or + returns the default if not found. + + :param seq: iterable sequence of objects + :param predicate: callable that accepts one item from the sequence + :param default: value to return if not found (default is None) + :return: default value if no item satisfies the predicate + """ + return next((i for i in seq if predicate(i)), default) + + +def check(predicate, message): + """ + Raise a RuntimeError with the provided message if predicate is False. + + Example: + check(path.is_file(), "{} must be a file".format(path.as_posix())) + + :param predicate: boolean condition + :param message: message + """ + if not predicate: + raise RuntimeError(message) + + +def unzip(zip_path, target_dir): + """ + Extracts a Zip archive located at zip_path to target_dir (which will be + created if it already exists) + + :param zip_path: path to valid zip file + :param target_dir: directory to unzip zip_path + """ + check( + zipfile.is_zipfile(zip_path), + "{} is not a valid zipfile or does not exist".format(zip_path), + ) + archive = zipfile.ZipFile(zip_path) + if not os.path.exists(target_dir): + os.makedirs(target_dir, exist_ok=True) + archive.extractall(path=target_dir) + + +def remove(sequence, exclude, key=None): + """ + Remove a copy of sequence that items occur in exclude. + + :param sequence: sequence of objects + :param exclude: objects to excluded (must support ``in`` check) + :param key: optional function to extract key from item in sequence + :return: list of items not in the excluded + """ + key_func = key if key else lambda x: x + result = (s for s in sequence if key_func(s) not in exclude) + return set(result) if isinstance(sequence, Set) else list(result) + + +def is_nova_server(resource): + """ + checks resource is a nova server + """ + return ( + isinstance(resource, dict) + and "type" in resource + and "properties" in resource + and resource.get("type") == "OS::Nova::Server" + )