424dde15e7b99d98425daf3b0e1170d3cf203013
[vvp/validation-scripts.git] / ice_validator / tests / helpers.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 #
39
40 """Helpers
41 """
42
43 import os
44 import re
45 import zipfile
46 from collections import defaultdict
47 from typing import Set
48
49 from boltons import funcutils
50 from tests import cached_yaml as yaml
51
52 __path__ = [os.path.dirname(os.path.abspath(__file__))]
53 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
54 RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
55
56
57 def is_base_module(template_path):
58     basename = os.path.basename(template_path).lower()
59     name, extension = os.path.splitext(basename)
60     is_yaml = extension in {".yml", ".yaml"}
61     return is_yaml and RE_BASE.search(name) and not name.endswith("_volume")
62
63
64 def check_basename_ending(template_type, basename):
65     """
66     return True/False if the template type is matching
67     the filename
68     """
69     if not template_type:
70         return True
71     elif template_type == "volume":
72         return basename.endswith("_volume")
73     else:
74         return not basename.endswith("_volume")
75
76
77 def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
78     """
79     get the parsed yaml for a list of yaml files
80     """
81     sections = [] if sections is None else sections
82     parsed_yml_list = []
83     for yaml_file in yaml_files:
84         try:
85             with open(yaml_file) as fh:
86                 yml = yaml.load(fh)
87         except yaml.YAMLError as e:
88             # pylint: disable=superfluous-parens
89             print("Error in %s: %s" % (yaml_file, e))
90             continue
91         if yml:
92             if sections:
93                 for k in yml.keys():
94                     if k not in sections:
95                         del yml[k]
96             parsed_yml_list.append(yml)
97     return parsed_yml_list
98
99
100 def validates(*requirement_ids):
101     """Decorator that tags the test function with one or more requirement IDs.
102
103     Example:
104         >>> @validates('R-12345', 'R-12346')
105         ... def test_something():
106         ...     pass
107         >>> assert test_something.requirement_ids == ['R-12345', 'R-12346']
108     """
109     # pylint: disable=missing-docstring
110     def decorator(func):
111         # NOTE: We use a utility here to ensure that function signatures are
112         # maintained because pytest inspects function signatures to inject
113         # fixtures.  I experimented with a few options, but this is the only
114         # library that worked. Other libraries dynamically generated a
115         # function at run-time, and then lost the requirement_ids attribute
116         @funcutils.wraps(func)
117         def wrapper(*args, **kw):
118             return func(*args, **kw)
119
120         wrapper.requirement_ids = requirement_ids
121         return wrapper
122
123     decorator.requirement_ids = requirement_ids
124     return decorator
125
126
127 def categories(*all_of, any_of=None):
128     any_of = set(any_of) if any_of else set()
129     all_of = set(all_of) if all_of else set()
130
131     def decorator(func):
132         @funcutils.wraps(func)
133         def wrapper(*args, **kw):
134             return func(*args, **kw)
135
136         wrapper.all_categories = all_of
137         wrapper.any_categories = any_of
138         return wrapper
139
140     decorator.all_categories = all_of
141     decorator.any_categories = any_of
142     return decorator
143
144
145 def get_environment_pair(heat_template):
146     """Returns a yaml/env pair given a yaml file"""
147     base_dir, filename = os.path.split(heat_template)
148     basename = os.path.splitext(filename)[0]
149     env_template = os.path.join(base_dir, "{}.env".format(basename))
150     if os.path.exists(env_template):
151         with open(heat_template, "r") as fh:
152             yyml = yaml.load(fh)
153         with open(env_template, "r") as fh:
154             eyml = yaml.load(fh)
155
156         environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml}
157         return environment_pair
158
159     return None
160
161
162 def find_environment_file(yaml_files):
163     """
164     Pass file and recursively step backwards until environment file is found
165
166     :param yaml_files: list or string, start at size 1 and grows recursively
167     :return: corresponding environment file for a file, or None
168     """
169     # sanitize
170     if isinstance(yaml_files, str):
171         yaml_files = [yaml_files]
172
173     yaml_file = yaml_files[-1]
174     filepath, filename = os.path.split(yaml_file)
175
176     environment_pair = get_environment_pair(yaml_file)
177     if environment_pair:
178         return environment_pair
179
180     for file in os.listdir(filepath):
181         fq_name = "{}/{}".format(filepath, file)
182         if fq_name.endswith("yaml") or fq_name.endswith("yml"):
183             if fq_name not in yaml_files:
184                 with open(fq_name) as f:
185                     yml = yaml.load(f)
186                 resources = yml.get("resources", {})
187                 for resource_id, resource in resources.items():
188                     resource_type = resource.get("type", "")
189                     if resource_type == "OS::Heat::ResourceGroup":
190                         resource_type = (
191                             resource.get("properties", {})
192                             .get("resource_def", {})
193                             .get("type", "")
194                         )
195                     # found called nested file
196                     if resource_type == filename:
197                         yaml_files.append(fq_name)
198                         environment_pair = find_environment_file(yaml_files)
199
200     return environment_pair
201
202
203 def load_yaml(yaml_file):
204     """
205     Load the YAML file at the given path.  If the file has previously been
206     loaded, then a cached version will be returned.
207
208     :param yaml_file: path to the YAML file
209     :return: data structure loaded from the YAML file
210     """
211     with open(yaml_file) as fh:
212         return yaml.load(fh)
213
214
215 def traverse(data, search_key, func, path=None):
216     """
217     Traverse the data structure provided via ``data`` looking for occurences
218     of ``search_key``.  When ``search_key`` is found, the value associated
219     with that key is passed to ``func``
220
221     :param data:        arbitrary data structure of dicts and lists
222     :param search_key:  key field to search for
223     :param func:        Callable object that takes two parameters:
224                         * A list representing the path of keys to search_key
225                         * The value associated with the search_key
226     """
227     path = [] if path is None else path
228     if isinstance(data, dict):
229         for key, value in data.items():
230             curr_path = path + [key]
231             if key == search_key:
232                 func(curr_path, value)
233             traverse(value, search_key, func, curr_path)
234     elif isinstance(data, list):
235         for value in data:
236             curr_path = path + [value]
237             if isinstance(value, (dict, list)):
238                 traverse(value, search_key, func, curr_path)
239             elif value == search_key:
240                 func(curr_path, value)
241
242
243 def check_indices(pattern, values, value_type):
244     """
245     Checks that indices associated with the matched prefix start at 0 and
246     increment by 1.  It returns a list of messages for any prefixes that
247     violate the rules.
248
249     :param pattern: Compiled regex that whose first group matches the prefix and
250                     second group matches the index
251     :param values:  sequence of string names that may or may not match the pattern
252     :param name:    Type of value being checked (ex: IP Parameters). This will
253                     be included in the error messages.
254     :return:        List of error messages, empty list if no violations found
255     """
256     if not hasattr(pattern, "match"):
257         raise RuntimeError("Pattern must be a compiled regex")
258
259     prefix_indices = defaultdict(set)
260     for value in values:
261         m = pattern.match(value)
262         if m:
263             prefix_indices[m.group(1)].add(int(m.group(2)))
264
265     invalid_params = []
266     for prefix, indices in prefix_indices.items():
267         indices = sorted(indices)
268         if indices[0] != 0:
269             invalid_params.append(
270                 "{} with prefix {} do not start at 0".format(value_type, prefix)
271             )
272         elif len(indices) - 1 != indices[-1]:
273             invalid_params.append(
274                 (
275                     "Index values of {} with prefix {} do not " + "increment by 1: {}"
276                 ).format(value_type, prefix, indices)
277             )
278     return invalid_params
279
280
281 def get_base_template_from_yaml_files(yaml_files):
282     """Return first filepath to match RE_BASE
283     """
284     for filepath in yaml_files:
285         basename = get_base_template_from_yaml_file(filepath)
286         if basename:
287             return basename
288     return None
289
290
291 def get_base_template_from_yaml_file(yaml_file):
292     (dirname, filename) = os.path.split(yaml_file)
293     files = os.listdir(dirname)
294     for file in files:
295         basename, __ = os.path.splitext(os.path.basename(file))
296         if (
297             (__ == ".yaml" or __ == ".yml")
298             and RE_BASE.search(basename)
299             and basename.find("volume") == -1
300         ):
301             return os.path.join(dirname, "{}{}".format(basename, __))
302     return None
303
304
305 def parameter_type_to_heat_type(parameter):
306     # getting parameter format
307     if isinstance(parameter, list):
308         parameter_type = "comma_delimited_list"
309     elif isinstance(parameter, str):
310         parameter_type = "string"
311     elif isinstance(parameter, dict):
312         parameter_type = "json"
313     elif isinstance(parameter, int) or isinstance(parameter, float):
314         parameter_type = "number"
315     elif isinstance(parameter, bool):
316         parameter_type = "boolean"
317     else:
318         parameter_type = None
319
320     return parameter_type
321
322
323 def prop_iterator(resource, *props):
324     terminators = ["get_resource", "get_attr", "str_replace", "get_param"]
325     if "properties" in resource:
326         resource = resource.get("properties")
327     props = list(props)
328
329     if isinstance(resource, dict) and any(x for x in terminators if x in resource):
330         yield resource
331     else:
332         prop = resource.get(props.pop(0))
333         if isinstance(prop, list):
334             for x in prop:
335                 yield from prop_iterator(x, *props)
336         elif isinstance(prop, dict):
337             yield from prop_iterator(prop, *props)
338
339
340 def get_param(property_value):
341     """
342     Returns the first parameter name from a get_param or None if get_param is
343     not used
344     """
345     if property_value and isinstance(property_value, dict):
346         param = property_value.get("get_param")
347         if param and isinstance(param, list) and len(param) > 0:
348             return param[0]
349         else:
350             return param
351     return None
352
353
354 def get_output_dir(config):
355     """
356     Retrieve the output directory for the reports and create it if necessary
357     :param config: pytest configuration
358     :return: output directory as string
359     """
360     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
361     if not os.path.exists(output_dir):
362         os.makedirs(output_dir, exist_ok=True)
363     return output_dir
364
365
366 def first(seq, predicate, default=None):
367     """
368     Return the first item in sequence that satisfies the callable, predicate, or
369     returns the default if not found.
370
371     :param seq:         iterable sequence of objects
372     :param predicate:   callable that accepts one item from the sequence
373     :param default:     value to return if not found (default is None)
374     :return:            default value if no item satisfies the predicate
375     """
376     return next((i for i in seq if predicate(i)), default)
377
378
379 def check(predicate, message):
380     """
381     Raise a RuntimeError with the provided message if predicate is False.
382
383     Example:
384         check(path.is_file(), "{} must be a file".format(path.as_posix()))
385
386     :param predicate:   boolean condition
387     :param message:     message
388     """
389     if not predicate:
390         raise RuntimeError(message)
391
392
393 def unzip(zip_path, target_dir):
394     """
395     Extracts a Zip archive located at zip_path to target_dir (which will be
396     created if it already exists)
397
398     :param zip_path:    path to valid zip file
399     :param target_dir:  directory to unzip zip_path
400     """
401     check(
402         zipfile.is_zipfile(zip_path),
403         "{} is not a valid zipfile or does not exist".format(zip_path),
404     )
405     archive = zipfile.ZipFile(zip_path)
406     if not os.path.exists(target_dir):
407         os.makedirs(target_dir, exist_ok=True)
408     archive.extractall(path=target_dir)
409
410
411 def remove(sequence, exclude, key=None):
412     """
413     Remove a copy of sequence that items occur in exclude.
414
415     :param sequence: sequence of objects
416     :param exclude:  objects to excluded (must support ``in`` check)
417     :param key:      optional function to extract key from item in sequence
418     :return:         list of items not in the excluded
419     """
420     key_func = key if key else lambda x: x
421     result = (s for s in sequence if key_func(s) not in exclude)
422     return set(result) if isinstance(sequence, Set) else list(result)
423
424
425 def is_nova_server(resource):
426     """
427     checks resource is a nova server
428     """
429     return isinstance(resource, dict) and "type" in resource and "properties" in resource and resource.get("type") == "OS::Nova::Server"
430