Aligned test with updated R-610030
[vvp/validation-scripts.git] / ice_validator / tests / helpers.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 #
39
40 """Helpers
41 """
42
43 import os
44 import re
45 import zipfile
46 from collections import defaultdict
47 from typing import Set
48
49 from boltons import funcutils
50 from tests import cached_yaml as yaml
51
52 __path__ = [os.path.dirname(os.path.abspath(__file__))]
53 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
54 RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
55
56 INTRINSIC_FUNCTIONS = [
57     "get_resource",
58     "get_attr",
59     "str_replace",
60     "get_param",
61     "list_join",
62     "get_file",
63     "resource_facade",
64     "Fn::Select",
65     "repeat",
66     "digest",
67     "str_split",
68     "yaql",
69     "map_replace",
70     "map_merge",
71 ]
72
73
74 def is_base_module(template_path):
75     basename = os.path.basename(template_path).lower()
76     name, extension = os.path.splitext(basename)
77     is_yaml = extension in {".yml", ".yaml"}
78     return is_yaml and RE_BASE.search(name) and not name.endswith("_volume")
79
80
81 def check_basename_ending(template_type, basename):
82     """
83     return True/False if the template type is matching
84     the filename
85     """
86     if not template_type:
87         return True
88     elif template_type == "volume":
89         return basename.endswith("_volume")
90     else:
91         return not basename.endswith("_volume")
92
93
94 def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
95     """
96     get the parsed yaml for a list of yaml files
97     """
98     sections = [] if sections is None else sections
99     parsed_yml_list = []
100     for yaml_file in yaml_files:
101         try:
102             with open(yaml_file) as fh:
103                 yml = yaml.load(fh)
104         except yaml.YAMLError as e:
105             # pylint: disable=superfluous-parens
106             print("Error in %s: %s" % (yaml_file, e))
107             continue
108         if yml:
109             if sections:
110                 for k in yml.keys():
111                     if k not in sections:
112                         del yml[k]
113             parsed_yml_list.append(yml)
114     return parsed_yml_list
115
116
117 def validates(*requirement_ids):
118     """Decorator that tags the test function with one or more requirement IDs.
119
120     Example:
121         >>> @validates('R-12345', 'R-12346')
122         ... def test_something():
123         ...     pass
124         >>> assert test_something.requirement_ids == ['R-12345', 'R-12346']
125     """
126     # pylint: disable=missing-docstring
127     def decorator(func):
128         # NOTE: We use a utility here to ensure that function signatures are
129         # maintained because pytest inspects function signatures to inject
130         # fixtures.  I experimented with a few options, but this is the only
131         # library that worked. Other libraries dynamically generated a
132         # function at run-time, and then lost the requirement_ids attribute
133         @funcutils.wraps(func)
134         def wrapper(*args, **kw):
135             return func(*args, **kw)
136
137         wrapper.requirement_ids = requirement_ids
138         return wrapper
139
140     decorator.requirement_ids = requirement_ids
141     return decorator
142
143
144 def categories(*all_of, any_of=None):
145     any_of = set(any_of) if any_of else set()
146     all_of = set(all_of) if all_of else set()
147
148     def decorator(func):
149         @funcutils.wraps(func)
150         def wrapper(*args, **kw):
151             return func(*args, **kw)
152
153         wrapper.all_categories = all_of
154         wrapper.any_categories = any_of
155         return wrapper
156
157     decorator.all_categories = all_of
158     decorator.any_categories = any_of
159     return decorator
160
161
162 def get_environment_pair(heat_template):
163     """Returns a yaml/env pair given a yaml file"""
164     base_dir, filename = os.path.split(heat_template)
165     basename = os.path.splitext(filename)[0]
166     env_template = os.path.join(base_dir, "{}.env".format(basename))
167     if os.path.exists(env_template):
168         with open(heat_template, "r") as fh:
169             yyml = yaml.load(fh)
170         with open(env_template, "r") as fh:
171             eyml = yaml.load(fh)
172
173         environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml}
174         return environment_pair
175
176     return None
177
178
179 def find_environment_file(yaml_files):
180     """
181     Pass file and recursively step backwards until environment file is found
182
183     :param yaml_files: list or string, start at size 1 and grows recursively
184     :return: corresponding environment file for a file, or None
185     """
186     # sanitize
187     if isinstance(yaml_files, str):
188         yaml_files = [yaml_files]
189
190     yaml_file = yaml_files[-1]
191     filepath, filename = os.path.split(yaml_file)
192
193     environment_pair = get_environment_pair(yaml_file)
194     if environment_pair:
195         return environment_pair
196
197     for file in os.listdir(filepath):
198         fq_name = "{}/{}".format(filepath, file)
199         if fq_name.endswith("yaml") or fq_name.endswith("yml"):
200             if fq_name not in yaml_files:
201                 with open(fq_name) as f:
202                     yml = yaml.load(f)
203                 resources = yml.get("resources", {})
204                 for resource_id, resource in resources.items():
205                     resource_type = resource.get("type", "")
206                     if resource_type == "OS::Heat::ResourceGroup":
207                         resource_type = (
208                             resource.get("properties", {})
209                             .get("resource_def", {})
210                             .get("type", "")
211                         )
212                     # found called nested file
213                     if resource_type == filename:
214                         yaml_files.append(fq_name)
215                         environment_pair = find_environment_file(yaml_files)
216
217     return environment_pair
218
219
220 def load_yaml(yaml_file):
221     """
222     Load the YAML file at the given path.  If the file has previously been
223     loaded, then a cached version will be returned.
224
225     :param yaml_file: path to the YAML file
226     :return: data structure loaded from the YAML file
227     """
228     with open(yaml_file) as fh:
229         return yaml.load(fh)
230
231
232 def traverse(data, search_key, func, path=None):
233     """
234     Traverse the data structure provided via ``data`` looking for occurences
235     of ``search_key``.  When ``search_key`` is found, the value associated
236     with that key is passed to ``func``
237
238     :param data:        arbitrary data structure of dicts and lists
239     :param search_key:  key field to search for
240     :param func:        Callable object that takes two parameters:
241                         * A list representing the path of keys to search_key
242                         * The value associated with the search_key
243     """
244     path = [] if path is None else path
245     if isinstance(data, dict):
246         for key, value in data.items():
247             curr_path = path + [key]
248             if key == search_key:
249                 func(curr_path, value)
250             traverse(value, search_key, func, curr_path)
251     elif isinstance(data, list):
252         for value in data:
253             curr_path = path + [value]
254             if isinstance(value, (dict, list)):
255                 traverse(value, search_key, func, curr_path)
256             elif value == search_key:
257                 func(curr_path, value)
258     elif search_key == data:
259         curr_path = path + [data]
260         func(curr_path, data)
261
262
263 def check_indices(pattern, values, value_type):
264     """
265     Checks that indices associated with the matched prefix start at 0 and
266     increment by 1.  It returns a list of messages for any prefixes that
267     violate the rules.
268
269     :param pattern: Compiled regex that whose first group matches the prefix and
270                     second group matches the index
271     :param values:  sequence of string names that may or may not match the pattern
272     :param name:    Type of value being checked (ex: IP Parameters). This will
273                     be included in the error messages.
274     :return:        List of error messages, empty list if no violations found
275     """
276     if not hasattr(pattern, "match"):
277         raise RuntimeError("Pattern must be a compiled regex")
278
279     prefix_indices = defaultdict(set)
280     for value in values:
281         m = pattern.match(value)
282         if m:
283             prefix_indices[m.group(1)].add(int(m.group(2)))
284
285     invalid_params = []
286     for prefix, indices in prefix_indices.items():
287         indices = sorted(indices)
288         if indices[0] != 0:
289             invalid_params.append(
290                 "{} with prefix {} do not start at 0".format(value_type, prefix)
291             )
292         elif len(indices) - 1 != indices[-1]:
293             invalid_params.append(
294                 (
295                     "Index values of {} with prefix {} do not " + "increment by 1: {}"
296                 ).format(value_type, prefix, indices)
297             )
298     return invalid_params
299
300
301 def get_base_template_from_yaml_files(yaml_files):
302     """Return first filepath to match RE_BASE
303     """
304     for filepath in yaml_files:
305         basename = get_base_template_from_yaml_file(filepath)
306         if basename:
307             return basename
308     return None
309
310
311 def get_base_template_from_yaml_file(yaml_file):
312     (dirname, filename) = os.path.split(yaml_file)
313     files = os.listdir(dirname)
314     for file in files:
315         basename, __ = os.path.splitext(os.path.basename(file))
316         if (
317             (__ == ".yaml" or __ == ".yml")
318             and RE_BASE.search(basename)
319             and basename.find("volume") == -1
320         ):
321             return os.path.join(dirname, "{}{}".format(basename, __))
322     return None
323
324
325 def parameter_type_to_heat_type(parameter):
326     # getting parameter format
327     if isinstance(parameter, list):
328         parameter_type = "comma_delimited_list"
329     elif isinstance(parameter, str):
330         parameter_type = "string"
331     elif isinstance(parameter, dict):
332         parameter_type = "json"
333     elif isinstance(parameter, int) or isinstance(parameter, float):
334         parameter_type = "number"
335     elif isinstance(parameter, bool):
336         parameter_type = "boolean"
337     else:
338         parameter_type = None
339
340     return parameter_type
341
342
343 def prop_iterator(resource, *props):
344     if "properties" in resource:
345         resource = resource.get("properties")
346     props = list(props)
347
348     if isinstance(resource, dict) and any(
349         x for x in INTRINSIC_FUNCTIONS if x in resource
350     ):
351         yield resource
352     else:
353         prop = resource.get(props.pop(0))
354         if isinstance(prop, list):
355             for x in prop:
356                 yield from prop_iterator(x, *props)
357         elif isinstance(prop, dict):
358             yield from prop_iterator(prop, *props)
359
360
361 def get_param(property_value):
362     """
363     Returns the first parameter name from a get_param or None if get_param is
364     not used
365     """
366     if property_value and isinstance(property_value, dict):
367         param = property_value.get("get_param")
368         if param and isinstance(param, list) and len(param) > 0:
369             return param[0]
370         else:
371             return param
372     return None
373
374
375 def get_output_dir(config):
376     """
377     Retrieve the output directory for the reports and create it if necessary
378     :param config: pytest configuration
379     :return: output directory as string
380     """
381     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
382     if not os.path.exists(output_dir):
383         os.makedirs(output_dir, exist_ok=True)
384     return output_dir
385
386
387 def first(seq, predicate, default=None):
388     """
389     Return the first item in sequence that satisfies the callable, predicate, or
390     returns the default if not found.
391
392     :param seq:         iterable sequence of objects
393     :param predicate:   callable that accepts one item from the sequence
394     :param default:     value to return if not found (default is None)
395     :return:            default value if no item satisfies the predicate
396     """
397     return next((i for i in seq if predicate(i)), default)
398
399
400 def check(predicate, message):
401     """
402     Raise a RuntimeError with the provided message if predicate is False.
403
404     Example:
405         check(path.is_file(), "{} must be a file".format(path.as_posix()))
406
407     :param predicate:   boolean condition
408     :param message:     message
409     """
410     if not predicate:
411         raise RuntimeError(message)
412
413
414 def unzip(zip_path, target_dir):
415     """
416     Extracts a Zip archive located at zip_path to target_dir (which will be
417     created if it already exists)
418
419     :param zip_path:    path to valid zip file
420     :param target_dir:  directory to unzip zip_path
421     """
422     check(
423         zipfile.is_zipfile(zip_path),
424         "{} is not a valid zipfile or does not exist".format(zip_path),
425     )
426     archive = zipfile.ZipFile(zip_path)
427     if not os.path.exists(target_dir):
428         os.makedirs(target_dir, exist_ok=True)
429     archive.extractall(path=target_dir)
430
431
432 def remove(sequence, exclude, key=None):
433     """
434     Remove a copy of sequence that items occur in exclude.
435
436     :param sequence: sequence of objects
437     :param exclude:  objects to excluded (must support ``in`` check)
438     :param key:      optional function to extract key from item in sequence
439     :return:         list of items not in the excluded
440     """
441     key_func = key if key else lambda x: x
442     result = (s for s in sequence if key_func(s) not in exclude)
443     return set(result) if isinstance(sequence, Set) else list(result)
444
445
446 def is_nova_server(resource):
447     """
448     checks resource is a nova server
449     """
450     return (
451         isinstance(resource, dict)
452         and "type" in resource
453         and "properties" in resource
454         and resource.get("type") == "OS::Nova::Server"
455     )