[VVP] Handle intrinsic functions in prop_iterator
[vvp/validation-scripts.git] / ice_validator / tests / helpers.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 #
39
40 """Helpers
41 """
42
43 import os
44 import re
45 import zipfile
46 from collections import defaultdict
47 from typing import Set
48
49 from boltons import funcutils
50 from tests import cached_yaml as yaml
51
52 __path__ = [os.path.dirname(os.path.abspath(__file__))]
53 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
54 RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
55
56 INTRINSIC_FUNCTIONS = [
57     "get_resource",
58     "get_attr",
59     "str_replace",
60     "get_param",
61     "list_join",
62     "get_file",
63     "resource_facade",
64     "Fn::Select",
65     "repeat",
66     "digest",
67     "str_split",
68     "yaql",
69     "map_replace",
70     "map_merge",
71 ]
72
73
74 def is_base_module(template_path):
75     basename = os.path.basename(template_path).lower()
76     name, extension = os.path.splitext(basename)
77     is_yaml = extension in {".yml", ".yaml"}
78     return is_yaml and RE_BASE.search(name) and not name.endswith("_volume")
79
80
81 def check_basename_ending(template_type, basename):
82     """
83     return True/False if the template type is matching
84     the filename
85     """
86     if not template_type:
87         return True
88     elif template_type == "volume":
89         return basename.endswith("_volume")
90     else:
91         return not basename.endswith("_volume")
92
93
94 def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
95     """
96     get the parsed yaml for a list of yaml files
97     """
98     sections = [] if sections is None else sections
99     parsed_yml_list = []
100     for yaml_file in yaml_files:
101         try:
102             with open(yaml_file) as fh:
103                 yml = yaml.load(fh)
104         except yaml.YAMLError as e:
105             # pylint: disable=superfluous-parens
106             print("Error in %s: %s" % (yaml_file, e))
107             continue
108         if yml:
109             if sections:
110                 for k in yml.keys():
111                     if k not in sections:
112                         del yml[k]
113             parsed_yml_list.append(yml)
114     return parsed_yml_list
115
116
117 def validates(*requirement_ids):
118     """Decorator that tags the test function with one or more requirement IDs.
119
120     Example:
121         >>> @validates('R-12345', 'R-12346')
122         ... def test_something():
123         ...     pass
124         >>> assert test_something.requirement_ids == ['R-12345', 'R-12346']
125     """
126     # pylint: disable=missing-docstring
127     def decorator(func):
128         # NOTE: We use a utility here to ensure that function signatures are
129         # maintained because pytest inspects function signatures to inject
130         # fixtures.  I experimented with a few options, but this is the only
131         # library that worked. Other libraries dynamically generated a
132         # function at run-time, and then lost the requirement_ids attribute
133         @funcutils.wraps(func)
134         def wrapper(*args, **kw):
135             return func(*args, **kw)
136
137         wrapper.requirement_ids = requirement_ids
138         return wrapper
139
140     decorator.requirement_ids = requirement_ids
141     return decorator
142
143
144 def categories(*all_of, any_of=None):
145     any_of = set(any_of) if any_of else set()
146     all_of = set(all_of) if all_of else set()
147
148     def decorator(func):
149         @funcutils.wraps(func)
150         def wrapper(*args, **kw):
151             return func(*args, **kw)
152
153         wrapper.all_categories = all_of
154         wrapper.any_categories = any_of
155         return wrapper
156
157     decorator.all_categories = all_of
158     decorator.any_categories = any_of
159     return decorator
160
161
162 def get_environment_pair(heat_template):
163     """Returns a yaml/env pair given a yaml file"""
164     base_dir, filename = os.path.split(heat_template)
165     basename = os.path.splitext(filename)[0]
166     env_template = os.path.join(base_dir, "{}.env".format(basename))
167     if os.path.exists(env_template):
168         with open(heat_template, "r") as fh:
169             yyml = yaml.load(fh)
170         with open(env_template, "r") as fh:
171             eyml = yaml.load(fh)
172
173         environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml}
174         return environment_pair
175
176     return None
177
178
179 def find_environment_file(yaml_files):
180     """
181     Pass file and recursively step backwards until environment file is found
182
183     :param yaml_files: list or string, start at size 1 and grows recursively
184     :return: corresponding environment file for a file, or None
185     """
186     # sanitize
187     if isinstance(yaml_files, str):
188         yaml_files = [yaml_files]
189
190     yaml_file = yaml_files[-1]
191     filepath, filename = os.path.split(yaml_file)
192
193     environment_pair = get_environment_pair(yaml_file)
194     if environment_pair:
195         return environment_pair
196
197     for file in os.listdir(filepath):
198         fq_name = "{}/{}".format(filepath, file)
199         if fq_name.endswith("yaml") or fq_name.endswith("yml"):
200             if fq_name not in yaml_files:
201                 with open(fq_name) as f:
202                     yml = yaml.load(f)
203                 resources = yml.get("resources", {})
204                 for resource_id, resource in resources.items():
205                     resource_type = resource.get("type", "")
206                     if resource_type == "OS::Heat::ResourceGroup":
207                         resource_type = (
208                             resource.get("properties", {})
209                             .get("resource_def", {})
210                             .get("type", "")
211                         )
212                     # found called nested file
213                     if resource_type == filename:
214                         yaml_files.append(fq_name)
215                         environment_pair = find_environment_file(yaml_files)
216
217     return environment_pair
218
219
220 def load_yaml(yaml_file):
221     """
222     Load the YAML file at the given path.  If the file has previously been
223     loaded, then a cached version will be returned.
224
225     :param yaml_file: path to the YAML file
226     :return: data structure loaded from the YAML file
227     """
228     with open(yaml_file) as fh:
229         return yaml.load(fh)
230
231
232 def traverse(data, search_key, func, path=None):
233     """
234     Traverse the data structure provided via ``data`` looking for occurences
235     of ``search_key``.  When ``search_key`` is found, the value associated
236     with that key is passed to ``func``
237
238     :param data:        arbitrary data structure of dicts and lists
239     :param search_key:  key field to search for
240     :param func:        Callable object that takes two parameters:
241                         * A list representing the path of keys to search_key
242                         * The value associated with the search_key
243     """
244     path = [] if path is None else path
245     if isinstance(data, dict):
246         for key, value in data.items():
247             curr_path = path + [key]
248             if key == search_key:
249                 func(curr_path, value)
250             traverse(value, search_key, func, curr_path)
251     elif isinstance(data, list):
252         for value in data:
253             curr_path = path + [value]
254             if isinstance(value, (dict, list)):
255                 traverse(value, search_key, func, curr_path)
256             elif value == search_key:
257                 func(curr_path, value)
258
259
260 def check_indices(pattern, values, value_type):
261     """
262     Checks that indices associated with the matched prefix start at 0 and
263     increment by 1.  It returns a list of messages for any prefixes that
264     violate the rules.
265
266     :param pattern: Compiled regex that whose first group matches the prefix and
267                     second group matches the index
268     :param values:  sequence of string names that may or may not match the pattern
269     :param name:    Type of value being checked (ex: IP Parameters). This will
270                     be included in the error messages.
271     :return:        List of error messages, empty list if no violations found
272     """
273     if not hasattr(pattern, "match"):
274         raise RuntimeError("Pattern must be a compiled regex")
275
276     prefix_indices = defaultdict(set)
277     for value in values:
278         m = pattern.match(value)
279         if m:
280             prefix_indices[m.group(1)].add(int(m.group(2)))
281
282     invalid_params = []
283     for prefix, indices in prefix_indices.items():
284         indices = sorted(indices)
285         if indices[0] != 0:
286             invalid_params.append(
287                 "{} with prefix {} do not start at 0".format(value_type, prefix)
288             )
289         elif len(indices) - 1 != indices[-1]:
290             invalid_params.append(
291                 (
292                     "Index values of {} with prefix {} do not " + "increment by 1: {}"
293                 ).format(value_type, prefix, indices)
294             )
295     return invalid_params
296
297
298 def get_base_template_from_yaml_files(yaml_files):
299     """Return first filepath to match RE_BASE
300     """
301     for filepath in yaml_files:
302         basename = get_base_template_from_yaml_file(filepath)
303         if basename:
304             return basename
305     return None
306
307
308 def get_base_template_from_yaml_file(yaml_file):
309     (dirname, filename) = os.path.split(yaml_file)
310     files = os.listdir(dirname)
311     for file in files:
312         basename, __ = os.path.splitext(os.path.basename(file))
313         if (
314             (__ == ".yaml" or __ == ".yml")
315             and RE_BASE.search(basename)
316             and basename.find("volume") == -1
317         ):
318             return os.path.join(dirname, "{}{}".format(basename, __))
319     return None
320
321
322 def parameter_type_to_heat_type(parameter):
323     # getting parameter format
324     if isinstance(parameter, list):
325         parameter_type = "comma_delimited_list"
326     elif isinstance(parameter, str):
327         parameter_type = "string"
328     elif isinstance(parameter, dict):
329         parameter_type = "json"
330     elif isinstance(parameter, int) or isinstance(parameter, float):
331         parameter_type = "number"
332     elif isinstance(parameter, bool):
333         parameter_type = "boolean"
334     else:
335         parameter_type = None
336
337     return parameter_type
338
339
340 def prop_iterator(resource, *props):
341     if "properties" in resource:
342         resource = resource.get("properties")
343     props = list(props)
344
345     if isinstance(resource, dict) and any(
346         x for x in INTRINSIC_FUNCTIONS if x in resource
347     ):
348         yield resource
349     else:
350         prop = resource.get(props.pop(0))
351         if isinstance(prop, list):
352             for x in prop:
353                 yield from prop_iterator(x, *props)
354         elif isinstance(prop, dict):
355             yield from prop_iterator(prop, *props)
356
357
358 def get_param(property_value):
359     """
360     Returns the first parameter name from a get_param or None if get_param is
361     not used
362     """
363     if property_value and isinstance(property_value, dict):
364         param = property_value.get("get_param")
365         if param and isinstance(param, list) and len(param) > 0:
366             return param[0]
367         else:
368             return param
369     return None
370
371
372 def get_output_dir(config):
373     """
374     Retrieve the output directory for the reports and create it if necessary
375     :param config: pytest configuration
376     :return: output directory as string
377     """
378     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
379     if not os.path.exists(output_dir):
380         os.makedirs(output_dir, exist_ok=True)
381     return output_dir
382
383
384 def first(seq, predicate, default=None):
385     """
386     Return the first item in sequence that satisfies the callable, predicate, or
387     returns the default if not found.
388
389     :param seq:         iterable sequence of objects
390     :param predicate:   callable that accepts one item from the sequence
391     :param default:     value to return if not found (default is None)
392     :return:            default value if no item satisfies the predicate
393     """
394     return next((i for i in seq if predicate(i)), default)
395
396
397 def check(predicate, message):
398     """
399     Raise a RuntimeError with the provided message if predicate is False.
400
401     Example:
402         check(path.is_file(), "{} must be a file".format(path.as_posix()))
403
404     :param predicate:   boolean condition
405     :param message:     message
406     """
407     if not predicate:
408         raise RuntimeError(message)
409
410
411 def unzip(zip_path, target_dir):
412     """
413     Extracts a Zip archive located at zip_path to target_dir (which will be
414     created if it already exists)
415
416     :param zip_path:    path to valid zip file
417     :param target_dir:  directory to unzip zip_path
418     """
419     check(
420         zipfile.is_zipfile(zip_path),
421         "{} is not a valid zipfile or does not exist".format(zip_path),
422     )
423     archive = zipfile.ZipFile(zip_path)
424     if not os.path.exists(target_dir):
425         os.makedirs(target_dir, exist_ok=True)
426     archive.extractall(path=target_dir)
427
428
429 def remove(sequence, exclude, key=None):
430     """
431     Remove a copy of sequence that items occur in exclude.
432
433     :param sequence: sequence of objects
434     :param exclude:  objects to excluded (must support ``in`` check)
435     :param key:      optional function to extract key from item in sequence
436     :return:         list of items not in the excluded
437     """
438     key_func = key if key else lambda x: x
439     result = (s for s in sequence if key_func(s) not in exclude)
440     return set(result) if isinstance(sequence, Set) else list(result)
441
442
443 def is_nova_server(resource):
444     """
445     checks resource is a nova server
446     """
447     return (
448         isinstance(resource, dict)
449         and "type" in resource
450         and "properties" in resource
451         and resource.get("type") == "OS::Nova::Server"
452     )