94effed587784725aee5c84971010d509d307999
[vvp/validation-scripts.git] / ice_validator / tests / helpers.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 #
39
40 """Helpers
41 """
42
43 import os
44 import re
45 import zipfile
46 from collections import defaultdict
47 from typing import Set
48
49 from boltons import funcutils
50 from tests import cached_yaml as yaml
51
52 __path__ = [os.path.dirname(os.path.abspath(__file__))]
53 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
54 RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
55
56
57 def is_base_module(template_path):
58     basename = os.path.basename(template_path).lower()
59     name, extension = os.path.splitext(basename)
60     is_yaml = extension in {".yml", ".yaml"}
61     return is_yaml and RE_BASE.search(name) and not name.endswith("_volume")
62
63
64 def check_basename_ending(template_type, basename):
65     """
66     return True/False if the template type is matching
67     the filename
68     """
69     if not template_type:
70         return True
71     elif template_type == "volume":
72         return basename.endswith("_volume")
73     else:
74         return not basename.endswith("_volume")
75
76
77 def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
78     """
79     get the parsed yaml for a list of yaml files
80     """
81     sections = [] if sections is None else sections
82     parsed_yml_list = []
83     for yaml_file in yaml_files:
84         try:
85             with open(yaml_file) as fh:
86                 yml = yaml.load(fh)
87         except yaml.YAMLError as e:
88             # pylint: disable=superfluous-parens
89             print("Error in %s: %s" % (yaml_file, e))
90             continue
91         if yml:
92             if sections:
93                 for k in yml.keys():
94                     if k not in sections:
95                         del yml[k]
96             parsed_yml_list.append(yml)
97     return parsed_yml_list
98
99
100 def validates(*requirement_ids):
101     """Decorator that tags the test function with one or more requirement IDs.
102
103     Example:
104         >>> @validates('R-12345', 'R-12346')
105         ... def test_something():
106         ...     pass
107         >>> assert test_something.requirement_ids == ['R-12345', 'R-12346']
108     """
109     # pylint: disable=missing-docstring
110     def decorator(func):
111         # NOTE: We use a utility here to ensure that function signatures are
112         # maintained because pytest inspects function signatures to inject
113         # fixtures.  I experimented with a few options, but this is the only
114         # library that worked. Other libraries dynamically generated a
115         # function at run-time, and then lost the requirement_ids attribute
116         @funcutils.wraps(func)
117         def wrapper(*args, **kw):
118             return func(*args, **kw)
119
120         wrapper.requirement_ids = requirement_ids
121         return wrapper
122
123     decorator.requirement_ids = requirement_ids
124     return decorator
125
126
127 def categories(*categories):
128     def decorator(func):
129         @funcutils.wraps(func)
130         def wrapper(*args, **kw):
131             return func(*args, **kw)
132
133         wrapper.categories = categories
134         return wrapper
135
136     decorator.categories = categories
137     return decorator
138
139
140 def get_environment_pair(heat_template):
141     """Returns a yaml/env pair given a yaml file"""
142     base_dir, filename = os.path.split(heat_template)
143     basename = os.path.splitext(filename)[0]
144     env_template = os.path.join(base_dir, "{}.env".format(basename))
145     if os.path.exists(env_template):
146         with open(heat_template, "r") as fh:
147             yyml = yaml.load(fh)
148         with open(env_template, "r") as fh:
149             eyml = yaml.load(fh)
150
151         environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml}
152         return environment_pair
153
154     return None
155
156
157 def find_environment_file(yaml_files):
158     """
159     Pass file and recursively step backwards until environment file is found
160
161     :param yaml_files: list or string, start at size 1 and grows recursively
162     :return: corresponding environment file for a file, or None
163     """
164     # sanitize
165     if isinstance(yaml_files, str):
166         yaml_files = [yaml_files]
167
168     yaml_file = yaml_files[-1]
169     filepath, filename = os.path.split(yaml_file)
170
171     environment_pair = get_environment_pair(yaml_file)
172     if environment_pair:
173         return environment_pair
174
175     for file in os.listdir(filepath):
176         fq_name = "{}/{}".format(filepath, file)
177         if fq_name.endswith("yaml") or fq_name.endswith("yml"):
178             if fq_name not in yaml_files:
179                 with open(fq_name) as f:
180                     yml = yaml.load(f)
181                 resources = yml.get("resources", {})
182                 for resource_id, resource in resources.items():
183                     resource_type = resource.get("type", "")
184                     if resource_type == "OS::Heat::ResourceGroup":
185                         resource_type = (
186                             resource.get("properties", {})
187                             .get("resource_def", {})
188                             .get("type", "")
189                         )
190                     # found called nested file
191                     if resource_type == filename:
192                         yaml_files.append(fq_name)
193                         environment_pair = find_environment_file(yaml_files)
194
195     return environment_pair
196
197
198 def load_yaml(yaml_file):
199     """
200     Load the YAML file at the given path.  If the file has previously been
201     loaded, then a cached version will be returned.
202
203     :param yaml_file: path to the YAML file
204     :return: data structure loaded from the YAML file
205     """
206     with open(yaml_file) as fh:
207         return yaml.load(fh)
208
209
210 def traverse(data, search_key, func, path=None):
211     """
212     Traverse the data structure provided via ``data`` looking for occurences
213     of ``search_key``.  When ``search_key`` is found, the value associated
214     with that key is passed to ``func``
215
216     :param data:        arbitrary data structure of dicts and lists
217     :param search_key:  key field to search for
218     :param func:        Callable object that takes two parameters:
219                         * A list representing the path of keys to search_key
220                         * The value associated with the search_key
221     """
222     path = [] if path is None else path
223     if isinstance(data, dict):
224         for key, value in data.items():
225             curr_path = path + [key]
226             if key == search_key:
227                 func(curr_path, value)
228             traverse(value, search_key, func, curr_path)
229     elif isinstance(data, list):
230         for value in data:
231             curr_path = path + [value]
232             if isinstance(value, (dict, list)):
233                 traverse(value, search_key, func, curr_path)
234             elif value == search_key:
235                 func(curr_path, value)
236
237
238 def check_indices(pattern, values, value_type):
239     """
240     Checks that indices associated with the matched prefix start at 0 and
241     increment by 1.  It returns a list of messages for any prefixes that
242     violate the rules.
243
244     :param pattern: Compiled regex that whose first group matches the prefix and
245                     second group matches the index
246     :param values:  sequence of string names that may or may not match the pattern
247     :param name:    Type of value being checked (ex: IP Parameters). This will
248                     be included in the error messages.
249     :return:        List of error messages, empty list if no violations found
250     """
251     if not hasattr(pattern, "match"):
252         raise RuntimeError("Pattern must be a compiled regex")
253
254     prefix_indices = defaultdict(set)
255     for value in values:
256         m = pattern.match(value)
257         if m:
258             prefix_indices[m.group(1)].add(int(m.group(2)))
259
260     invalid_params = []
261     for prefix, indices in prefix_indices.items():
262         indices = sorted(indices)
263         if indices[0] != 0:
264             invalid_params.append(
265                 "{} with prefix {} do not start at 0".format(value_type, prefix)
266             )
267         elif len(indices) - 1 != indices[-1]:
268             invalid_params.append(
269                 (
270                     "Index values of {} with prefix {} do not " + "increment by 1: {}"
271                 ).format(value_type, prefix, indices)
272             )
273     return invalid_params
274
275
276 def get_base_template_from_yaml_files(yaml_files):
277     """Return first filepath to match RE_BASE
278     """
279     for filepath in yaml_files:
280         basename = get_base_template_from_yaml_file(filepath)
281         if basename:
282             return basename
283     return None
284
285
286 def get_base_template_from_yaml_file(yaml_file):
287     (dirname, filename) = os.path.split(yaml_file)
288     files = os.listdir(dirname)
289     for file in files:
290         basename, __ = os.path.splitext(os.path.basename(file))
291         if (
292             (__ == ".yaml" or __ == ".yml")
293             and RE_BASE.search(basename)
294             and basename.find("volume") == -1
295         ):
296             return os.path.join(dirname, "{}{}".format(basename, __))
297     return None
298
299
300 def parameter_type_to_heat_type(parameter):
301     # getting parameter format
302     if isinstance(parameter, list):
303         parameter_type = "comma_delimited_list"
304     elif isinstance(parameter, str):
305         parameter_type = "string"
306     elif isinstance(parameter, dict):
307         parameter_type = "json"
308     elif isinstance(parameter, int):
309         parameter_type = "number"
310     elif isinstance(parameter, float):
311         parameter_type = "number"
312     elif isinstance(parameter, bool):
313         parameter_type = "boolean"
314     else:
315         parameter_type = None
316
317     return parameter_type
318
319
320 def prop_iterator(resource, *props):
321     terminators = ["get_resource", "get_attr", "str_replace", "get_param"]
322     if "properties" in resource:
323         resource = resource.get("properties")
324     props = list(props)
325
326     if isinstance(resource, dict) and any(x for x in terminators if x in resource):
327         yield resource
328     else:
329         prop = resource.get(props.pop(0))
330         if isinstance(prop, list):
331             for x in prop:
332                 yield from prop_iterator(x, *props)
333         elif isinstance(prop, dict):
334             yield from prop_iterator(prop, *props)
335
336
337 def get_param(property_value):
338     """
339     Returns the first parameter name from a get_param or None if get_param is
340     not used
341     """
342     if property_value and isinstance(property_value, dict):
343         param = property_value.get("get_param")
344         if param and isinstance(param, list) and len(param) > 0:
345             return param[0]
346         else:
347             return param
348     return None
349
350
351 def get_output_dir(config):
352     """
353     Retrieve the output directory for the reports and create it if necessary
354     :param config: pytest configuration
355     :return: output directory as string
356     """
357     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
358     if not os.path.exists(output_dir):
359         os.makedirs(output_dir, exist_ok=True)
360     return output_dir
361
362
363 def first(seq, predicate, default=None):
364     """
365     Return the first item in sequence that satisfies the callable, predicate, or
366     returns the default if not found.
367
368     :param seq:         iterable sequence of objects
369     :param predicate:   callable that accepts one item from the sequence
370     :param default:     value to return if not found (default is None)
371     :return:            default value if no item satisfies the predicate
372     """
373     return next((i for i in seq if predicate(i)), default)
374
375
376 def check(predicate, message):
377     """
378     Raise a RuntimeError with the provided message if predicate is False.
379
380     Example:
381         check(path.is_file(), "{} must be a file".format(path.as_posix()))
382
383     :param predicate:   boolean condition
384     :param message:     message
385     """
386     if not predicate:
387         raise RuntimeError(message)
388
389
390 def unzip(zip_path, target_dir):
391     """
392     Extracts a Zip archive located at zip_path to target_dir (which will be
393     created if it already exists)
394
395     :param zip_path:    path to valid zip file
396     :param target_dir:  directory to unzip zip_path
397     """
398     check(zipfile.is_zipfile(zip_path), "{} is not a valid zipfile or does not exist".format(zip_path))
399     archive = zipfile.ZipFile(zip_path)
400     if not os.path.exists(target_dir):
401         os.makedirs(target_dir, exist_ok=True)
402     archive.extractall(path=target_dir)
403
404
405 def remove(sequence, exclude, key=None):
406     """
407     Remove a copy of sequence that items occur in exclude.
408
409     :param sequence: sequence of objects
410     :param exclude:  objects to excluded (must support ``in`` check)
411     :param key:      optional function to extract key from item in sequence
412     :return:         list of items not in the excluded
413     """
414     key_func = key if key else lambda x: x
415     result = (s for s in sequence if key_func(s) not in exclude)
416     return set(result) if isinstance(sequence, Set) else list(result)