added Is_nova_server method to helpers
[vvp/validation-scripts.git] / ice_validator / tests / helpers.py
old mode 100755 (executable)
new mode 100644 (file)
index 37fd01f..291464d
@@ -1,12 +1,12 @@
 # -*- coding: utf8 -*-
-# ============LICENSE_START=======================================================
+# ============LICENSE_START====================================================
 # org.onap.vvp/validation-scripts
 # ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# Copyright © 2019 AT&T Intellectual Property. All rights reserved.
 # ===================================================================
 #
 # Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
+# under the Apache License, Version 2.0 (the "License");
 # you may not use this software except in compliance with the License.
 # You may obtain a copy of the License at
 #
@@ -21,7 +21,7 @@
 #
 #
 # Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# under the Creative Commons License, Attribution 4.0 Intl. (the "License");
 # you may not use this documentation except in compliance with the License.
 # You may obtain a copy of the License at
 #
 #
 # ============LICENSE_END============================================
 #
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
 #
 
-import yaml
+"""Helpers
+"""
+
+import os
+import re
+import zipfile
+from collections import defaultdict
+from typing import Set
+
 from boltons import funcutils
+from tests import cached_yaml as yaml
+
+__path__ = [os.path.dirname(os.path.abspath(__file__))]
+DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
+RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
+
+
+def is_base_module(template_path):
+    basename = os.path.basename(template_path).lower()
+    name, extension = os.path.splitext(basename)
+    is_yaml = extension in {".yml", ".yaml"}
+    return is_yaml and RE_BASE.search(name) and not name.endswith("_volume")
 
 
 def check_basename_ending(template_type, basename):
-    '''
+    """
     return True/False if the template type is matching
     the filename
-    '''
+    """
     if not template_type:
         return True
-    elif template_type == 'volume':
-        return basename.endswith('_volume')
+    elif template_type == "volume":
+        return basename.endswith("_volume")
     else:
-        return not basename.endswith('_volume')
+        return not basename.endswith("_volume")
 
 
 def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
-    '''
+    """
     get the parsed yaml for a list of yaml files
-    '''
+    """
     sections = [] if sections is None else sections
     parsed_yml_list = []
     for yaml_file in yaml_files:
-        yml = ''
-
         try:
             with open(yaml_file) as fh:
                 yml = yaml.load(fh)
-        except Exception as e:
-            print('Error in %s: %s' % (yaml_file, e))
+        except yaml.YAMLError as e:
+            # pylint: disable=superfluous-parens
+            print("Error in %s: %s" % (yaml_file, e))
             continue
         if yml:
             if sections:
@@ -76,7 +94,6 @@ def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
                     if k not in sections:
                         del yml[k]
             parsed_yml_list.append(yml)
-
     return parsed_yml_list
 
 
@@ -89,6 +106,7 @@ def validates(*requirement_ids):
         ...     pass
         >>> assert test_something.requirement_ids == ['R-12345', 'R-12346']
     """
+    # pylint: disable=missing-docstring
     def decorator(func):
         # NOTE: We use a utility here to ensure that function signatures are
         # maintained because pytest inspects function signatures to inject
@@ -98,7 +116,307 @@ def validates(*requirement_ids):
         @funcutils.wraps(func)
         def wrapper(*args, **kw):
             return func(*args, **kw)
+
         wrapper.requirement_ids = requirement_ids
         return wrapper
+
     decorator.requirement_ids = requirement_ids
     return decorator
+
+
+def categories(*categories):
+    def decorator(func):
+        @funcutils.wraps(func)
+        def wrapper(*args, **kw):
+            return func(*args, **kw)
+
+        wrapper.categories = categories
+        return wrapper
+
+    decorator.categories = categories
+    return decorator
+
+
+def get_environment_pair(heat_template):
+    """Returns a yaml/env pair given a yaml file"""
+    base_dir, filename = os.path.split(heat_template)
+    basename = os.path.splitext(filename)[0]
+    env_template = os.path.join(base_dir, "{}.env".format(basename))
+    if os.path.exists(env_template):
+        with open(heat_template, "r") as fh:
+            yyml = yaml.load(fh)
+        with open(env_template, "r") as fh:
+            eyml = yaml.load(fh)
+
+        environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml}
+        return environment_pair
+
+    return None
+
+
+def find_environment_file(yaml_files):
+    """
+    Pass file and recursively step backwards until environment file is found
+
+    :param yaml_files: list or string, start at size 1 and grows recursively
+    :return: corresponding environment file for a file, or None
+    """
+    # sanitize
+    if isinstance(yaml_files, str):
+        yaml_files = [yaml_files]
+
+    yaml_file = yaml_files[-1]
+    filepath, filename = os.path.split(yaml_file)
+
+    environment_pair = get_environment_pair(yaml_file)
+    if environment_pair:
+        return environment_pair
+
+    for file in os.listdir(filepath):
+        fq_name = "{}/{}".format(filepath, file)
+        if fq_name.endswith("yaml") or fq_name.endswith("yml"):
+            if fq_name not in yaml_files:
+                with open(fq_name) as f:
+                    yml = yaml.load(f)
+                resources = yml.get("resources", {})
+                for resource_id, resource in resources.items():
+                    resource_type = resource.get("type", "")
+                    if resource_type == "OS::Heat::ResourceGroup":
+                        resource_type = (
+                            resource.get("properties", {})
+                            .get("resource_def", {})
+                            .get("type", "")
+                        )
+                    # found called nested file
+                    if resource_type == filename:
+                        yaml_files.append(fq_name)
+                        environment_pair = find_environment_file(yaml_files)
+
+    return environment_pair
+
+
+def load_yaml(yaml_file):
+    """
+    Load the YAML file at the given path.  If the file has previously been
+    loaded, then a cached version will be returned.
+
+    :param yaml_file: path to the YAML file
+    :return: data structure loaded from the YAML file
+    """
+    with open(yaml_file) as fh:
+        return yaml.load(fh)
+
+
+def traverse(data, search_key, func, path=None):
+    """
+    Traverse the data structure provided via ``data`` looking for occurences
+    of ``search_key``.  When ``search_key`` is found, the value associated
+    with that key is passed to ``func``
+
+    :param data:        arbitrary data structure of dicts and lists
+    :param search_key:  key field to search for
+    :param func:        Callable object that takes two parameters:
+                        * A list representing the path of keys to search_key
+                        * The value associated with the search_key
+    """
+    path = [] if path is None else path
+    if isinstance(data, dict):
+        for key, value in data.items():
+            curr_path = path + [key]
+            if key == search_key:
+                func(curr_path, value)
+            traverse(value, search_key, func, curr_path)
+    elif isinstance(data, list):
+        for value in data:
+            curr_path = path + [value]
+            if isinstance(value, (dict, list)):
+                traverse(value, search_key, func, curr_path)
+            elif value == search_key:
+                func(curr_path, value)
+
+
+def check_indices(pattern, values, value_type):
+    """
+    Checks that indices associated with the matched prefix start at 0 and
+    increment by 1.  It returns a list of messages for any prefixes that
+    violate the rules.
+
+    :param pattern: Compiled regex that whose first group matches the prefix and
+                    second group matches the index
+    :param values:  sequence of string names that may or may not match the pattern
+    :param name:    Type of value being checked (ex: IP Parameters). This will
+                    be included in the error messages.
+    :return:        List of error messages, empty list if no violations found
+    """
+    if not hasattr(pattern, "match"):
+        raise RuntimeError("Pattern must be a compiled regex")
+
+    prefix_indices = defaultdict(set)
+    for value in values:
+        m = pattern.match(value)
+        if m:
+            prefix_indices[m.group(1)].add(int(m.group(2)))
+
+    invalid_params = []
+    for prefix, indices in prefix_indices.items():
+        indices = sorted(indices)
+        if indices[0] != 0:
+            invalid_params.append(
+                "{} with prefix {} do not start at 0".format(value_type, prefix)
+            )
+        elif len(indices) - 1 != indices[-1]:
+            invalid_params.append(
+                (
+                    "Index values of {} with prefix {} do not " + "increment by 1: {}"
+                ).format(value_type, prefix, indices)
+            )
+    return invalid_params
+
+
+def get_base_template_from_yaml_files(yaml_files):
+    """Return first filepath to match RE_BASE
+    """
+    for filepath in yaml_files:
+        basename = get_base_template_from_yaml_file(filepath)
+        if basename:
+            return basename
+    return None
+
+
+def get_base_template_from_yaml_file(yaml_file):
+    (dirname, filename) = os.path.split(yaml_file)
+    files = os.listdir(dirname)
+    for file in files:
+        basename, __ = os.path.splitext(os.path.basename(file))
+        if (
+            (__ == ".yaml" or __ == ".yml")
+            and RE_BASE.search(basename)
+            and basename.find("volume") == -1
+        ):
+            return os.path.join(dirname, "{}{}".format(basename, __))
+    return None
+
+
+def parameter_type_to_heat_type(parameter):
+    # getting parameter format
+    if isinstance(parameter, list):
+        parameter_type = "comma_delimited_list"
+    elif isinstance(parameter, str):
+        parameter_type = "string"
+    elif isinstance(parameter, dict):
+        parameter_type = "json"
+    elif isinstance(parameter, int) or isinstance(parameter, float):
+        parameter_type = "number"
+    elif isinstance(parameter, bool):
+        parameter_type = "boolean"
+    else:
+        parameter_type = None
+
+    return parameter_type
+
+
+def prop_iterator(resource, *props):
+    terminators = ["get_resource", "get_attr", "str_replace", "get_param"]
+    if "properties" in resource:
+        resource = resource.get("properties")
+    props = list(props)
+
+    if isinstance(resource, dict) and any(x for x in terminators if x in resource):
+        yield resource
+    else:
+        prop = resource.get(props.pop(0))
+        if isinstance(prop, list):
+            for x in prop:
+                yield from prop_iterator(x, *props)
+        elif isinstance(prop, dict):
+            yield from prop_iterator(prop, *props)
+
+
+def get_param(property_value):
+    """
+    Returns the first parameter name from a get_param or None if get_param is
+    not used
+    """
+    if property_value and isinstance(property_value, dict):
+        param = property_value.get("get_param")
+        if param and isinstance(param, list) and len(param) > 0:
+            return param[0]
+        else:
+            return param
+    return None
+
+
+def get_output_dir(config):
+    """
+    Retrieve the output directory for the reports and create it if necessary
+    :param config: pytest configuration
+    :return: output directory as string
+    """
+    output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
+    if not os.path.exists(output_dir):
+        os.makedirs(output_dir, exist_ok=True)
+    return output_dir
+
+
+def first(seq, predicate, default=None):
+    """
+    Return the first item in sequence that satisfies the callable, predicate, or
+    returns the default if not found.
+
+    :param seq:         iterable sequence of objects
+    :param predicate:   callable that accepts one item from the sequence
+    :param default:     value to return if not found (default is None)
+    :return:            default value if no item satisfies the predicate
+    """
+    return next((i for i in seq if predicate(i)), default)
+
+
+def check(predicate, message):
+    """
+    Raise a RuntimeError with the provided message if predicate is False.
+
+    Example:
+        check(path.is_file(), "{} must be a file".format(path.as_posix()))
+
+    :param predicate:   boolean condition
+    :param message:     message
+    """
+    if not predicate:
+        raise RuntimeError(message)
+
+
+def unzip(zip_path, target_dir):
+    """
+    Extracts a Zip archive located at zip_path to target_dir (which will be
+    created if it already exists)
+
+    :param zip_path:    path to valid zip file
+    :param target_dir:  directory to unzip zip_path
+    """
+    check(zipfile.is_zipfile(zip_path), "{} is not a valid zipfile or does not exist".format(zip_path))
+    archive = zipfile.ZipFile(zip_path)
+    if not os.path.exists(target_dir):
+        os.makedirs(target_dir, exist_ok=True)
+    archive.extractall(path=target_dir)
+
+
+def remove(sequence, exclude, key=None):
+    """
+    Remove a copy of sequence that items occur in exclude.
+
+    :param sequence: sequence of objects
+    :param exclude:  objects to excluded (must support ``in`` check)
+    :param key:      optional function to extract key from item in sequence
+    :return:         list of items not in the excluded
+    """
+    key_func = key if key else lambda x: x
+    result = (s for s in sequence if key_func(s) not in exclude)
+    return set(result) if isinstance(sequence, Set) else list(result)
+
+
+def is_nova_server(resource):
+    """
+    checks resource is a nova server
+    """
+    return isinstance(resource, dict) and "type" in resource and "properties" in resource and resource.get("type") == "OS::Nova::Server"
+