import os
import re
+import zipfile
from collections import defaultdict
+from typing import Set
from boltons import funcutils
from tests import cached_yaml as yaml
-VERSION = "1.1.0"
+__path__ = [os.path.dirname(os.path.abspath(__file__))]
+DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
+RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
+
+
+def is_base_module(template_path):
+ basename = os.path.basename(template_path).lower()
+ name, extension = os.path.splitext(basename)
+ is_yaml = extension in {".yml", ".yaml"}
+ return is_yaml and RE_BASE.search(name) and not name.endswith("_volume")
def check_basename_ending(template_type, basename):
return decorator
-def categories(*categories):
+def categories(*all_of, any_of=None):
+ any_of = set(any_of) if any_of else set()
+ all_of = set(all_of) if all_of else set()
+
def decorator(func):
@funcutils.wraps(func)
def wrapper(*args, **kw):
return func(*args, **kw)
- wrapper.categories = categories
+ wrapper.all_categories = all_of
+ wrapper.any_categories = any_of
return wrapper
- decorator.categories = categories
+ decorator.all_categories = all_of
+ decorator.any_categories = any_of
return decorator
elif isinstance(data, list):
for value in data:
curr_path = path + [value]
- if isinstance(value, dict):
+ if isinstance(value, (dict, list)):
traverse(value, search_key, func, curr_path)
elif value == search_key:
func(curr_path, value)
return invalid_params
-RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
-
-
def get_base_template_from_yaml_files(yaml_files):
"""Return first filepath to match RE_BASE
"""
parameter_type = "string"
elif isinstance(parameter, dict):
parameter_type = "json"
- elif isinstance(parameter, int):
- parameter_type = "number"
- elif isinstance(parameter, float):
+ elif isinstance(parameter, int) or isinstance(parameter, float):
parameter_type = "number"
elif isinstance(parameter, bool):
parameter_type = "boolean"
def prop_iterator(resource, *props):
- terminators = ["get_resource", "get_attr", "str_replace"]
+ terminators = ["get_resource", "get_attr", "str_replace", "get_param"]
if "properties" in resource:
resource = resource.get("properties")
props = list(props)
- if isinstance(resource, dict) and "get_param" in resource:
- yield resource.get("get_param")
+ if isinstance(resource, dict) and any(x for x in terminators if x in resource):
+ yield resource
else:
prop = resource.get(props.pop(0))
if isinstance(prop, list):
for x in prop:
yield from prop_iterator(x, *props)
- elif isinstance(prop, dict) and not any(x for x in terminators if x in prop):
+ elif isinstance(prop, dict):
yield from prop_iterator(prop, *props)
+
+
+def get_param(property_value):
+ """
+ Returns the first parameter name from a get_param or None if get_param is
+ not used
+ """
+ if property_value and isinstance(property_value, dict):
+ param = property_value.get("get_param")
+ if param and isinstance(param, list) and len(param) > 0:
+ return param[0]
+ else:
+ return param
+ return None
+
+
+def get_output_dir(config):
+ """
+ Retrieve the output directory for the reports and create it if necessary
+ :param config: pytest configuration
+ :return: output directory as string
+ """
+ output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
+ if not os.path.exists(output_dir):
+ os.makedirs(output_dir, exist_ok=True)
+ return output_dir
+
+
+def first(seq, predicate, default=None):
+ """
+ Return the first item in sequence that satisfies the callable, predicate, or
+ returns the default if not found.
+
+ :param seq: iterable sequence of objects
+ :param predicate: callable that accepts one item from the sequence
+ :param default: value to return if not found (default is None)
+ :return: default value if no item satisfies the predicate
+ """
+ return next((i for i in seq if predicate(i)), default)
+
+
+def check(predicate, message):
+ """
+ Raise a RuntimeError with the provided message if predicate is False.
+
+ Example:
+ check(path.is_file(), "{} must be a file".format(path.as_posix()))
+
+ :param predicate: boolean condition
+ :param message: message
+ """
+ if not predicate:
+ raise RuntimeError(message)
+
+
+def unzip(zip_path, target_dir):
+ """
+ Extracts a Zip archive located at zip_path to target_dir (which will be
+ created if it already exists)
+
+ :param zip_path: path to valid zip file
+ :param target_dir: directory to unzip zip_path
+ """
+ check(
+ zipfile.is_zipfile(zip_path),
+ "{} is not a valid zipfile or does not exist".format(zip_path),
+ )
+ archive = zipfile.ZipFile(zip_path)
+ if not os.path.exists(target_dir):
+ os.makedirs(target_dir, exist_ok=True)
+ archive.extractall(path=target_dir)
+
+
+def remove(sequence, exclude, key=None):
+ """
+ Remove a copy of sequence that items occur in exclude.
+
+ :param sequence: sequence of objects
+ :param exclude: objects to excluded (must support ``in`` check)
+ :param key: optional function to extract key from item in sequence
+ :return: list of items not in the excluded
+ """
+ key_func = key if key else lambda x: x
+ result = (s for s in sequence if key_func(s) not in exclude)
+ return set(result) if isinstance(sequence, Set) else list(result)
+
+
+def is_nova_server(resource):
+ """
+ checks resource is a nova server
+ """
+ return isinstance(resource, dict) and "type" in resource and "properties" in resource and resource.get("type") == "OS::Nova::Server"
+