[VVP] Adding preload generation functionality
[vvp/validation-scripts.git] / ice_validator / tests / helpers.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 #
39
40 """Helpers
41 """
42
43 import os
44 import re
45 from collections import defaultdict
46
47 from boltons import funcutils
48 from tests import cached_yaml as yaml
49
50 __path__ = [os.path.dirname(os.path.abspath(__file__))]
51 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
52 RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
53
54
55 def is_base_module(template_path):
56     basename = os.path.basename(template_path).lower()
57     name, extension = os.path.splitext(basename)
58     is_yaml = extension in {".yml", ".yaml"}
59     return is_yaml and RE_BASE.search(name) and not name.endswith("_volume")
60
61
62 def check_basename_ending(template_type, basename):
63     """
64     return True/False if the template type is matching
65     the filename
66     """
67     if not template_type:
68         return True
69     elif template_type == "volume":
70         return basename.endswith("_volume")
71     else:
72         return not basename.endswith("_volume")
73
74
75 def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
76     """
77     get the parsed yaml for a list of yaml files
78     """
79     sections = [] if sections is None else sections
80     parsed_yml_list = []
81     for yaml_file in yaml_files:
82         try:
83             with open(yaml_file) as fh:
84                 yml = yaml.load(fh)
85         except yaml.YAMLError as e:
86             # pylint: disable=superfluous-parens
87             print("Error in %s: %s" % (yaml_file, e))
88             continue
89         if yml:
90             if sections:
91                 for k in yml.keys():
92                     if k not in sections:
93                         del yml[k]
94             parsed_yml_list.append(yml)
95     return parsed_yml_list
96
97
98 def validates(*requirement_ids):
99     """Decorator that tags the test function with one or more requirement IDs.
100
101     Example:
102         >>> @validates('R-12345', 'R-12346')
103         ... def test_something():
104         ...     pass
105         >>> assert test_something.requirement_ids == ['R-12345', 'R-12346']
106     """
107     # pylint: disable=missing-docstring
108     def decorator(func):
109         # NOTE: We use a utility here to ensure that function signatures are
110         # maintained because pytest inspects function signatures to inject
111         # fixtures.  I experimented with a few options, but this is the only
112         # library that worked. Other libraries dynamically generated a
113         # function at run-time, and then lost the requirement_ids attribute
114         @funcutils.wraps(func)
115         def wrapper(*args, **kw):
116             return func(*args, **kw)
117
118         wrapper.requirement_ids = requirement_ids
119         return wrapper
120
121     decorator.requirement_ids = requirement_ids
122     return decorator
123
124
125 def categories(*categories):
126     def decorator(func):
127         @funcutils.wraps(func)
128         def wrapper(*args, **kw):
129             return func(*args, **kw)
130
131         wrapper.categories = categories
132         return wrapper
133
134     decorator.categories = categories
135     return decorator
136
137
138 def get_environment_pair(heat_template):
139     """Returns a yaml/env pair given a yaml file"""
140     base_dir, filename = os.path.split(heat_template)
141     basename = os.path.splitext(filename)[0]
142     env_template = os.path.join(base_dir, "{}.env".format(basename))
143     if os.path.exists(env_template):
144         with open(heat_template, "r") as fh:
145             yyml = yaml.load(fh)
146         with open(env_template, "r") as fh:
147             eyml = yaml.load(fh)
148
149         environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml}
150         return environment_pair
151
152     return None
153
154
155 def find_environment_file(yaml_files):
156     """
157     Pass file and recursively step backwards until environment file is found
158
159     :param yaml_files: list or string, start at size 1 and grows recursively
160     :return: corresponding environment file for a file, or None
161     """
162     # sanitize
163     if isinstance(yaml_files, str):
164         yaml_files = [yaml_files]
165
166     yaml_file = yaml_files[-1]
167     filepath, filename = os.path.split(yaml_file)
168
169     environment_pair = get_environment_pair(yaml_file)
170     if environment_pair:
171         return environment_pair
172
173     for file in os.listdir(filepath):
174         fq_name = "{}/{}".format(filepath, file)
175         if fq_name.endswith("yaml") or fq_name.endswith("yml"):
176             if fq_name not in yaml_files:
177                 with open(fq_name) as f:
178                     yml = yaml.load(f)
179                 resources = yml.get("resources", {})
180                 for resource_id, resource in resources.items():
181                     resource_type = resource.get("type", "")
182                     if resource_type == "OS::Heat::ResourceGroup":
183                         resource_type = (
184                             resource.get("properties", {})
185                             .get("resource_def", {})
186                             .get("type", "")
187                         )
188                     # found called nested file
189                     if resource_type == filename:
190                         yaml_files.append(fq_name)
191                         environment_pair = find_environment_file(yaml_files)
192
193     return environment_pair
194
195
196 def load_yaml(yaml_file):
197     """
198     Load the YAML file at the given path.  If the file has previously been
199     loaded, then a cached version will be returned.
200
201     :param yaml_file: path to the YAML file
202     :return: data structure loaded from the YAML file
203     """
204     with open(yaml_file) as fh:
205         return yaml.load(fh)
206
207
208 def traverse(data, search_key, func, path=None):
209     """
210     Traverse the data structure provided via ``data`` looking for occurences
211     of ``search_key``.  When ``search_key`` is found, the value associated
212     with that key is passed to ``func``
213
214     :param data:        arbitrary data structure of dicts and lists
215     :param search_key:  key field to search for
216     :param func:        Callable object that takes two parameters:
217                         * A list representing the path of keys to search_key
218                         * The value associated with the search_key
219     """
220     path = [] if path is None else path
221     if isinstance(data, dict):
222         for key, value in data.items():
223             curr_path = path + [key]
224             if key == search_key:
225                 func(curr_path, value)
226             traverse(value, search_key, func, curr_path)
227     elif isinstance(data, list):
228         for value in data:
229             curr_path = path + [value]
230             if isinstance(value, (dict, list)):
231                 traverse(value, search_key, func, curr_path)
232             elif value == search_key:
233                 func(curr_path, value)
234
235
236 def check_indices(pattern, values, value_type):
237     """
238     Checks that indices associated with the matched prefix start at 0 and
239     increment by 1.  It returns a list of messages for any prefixes that
240     violate the rules.
241
242     :param pattern: Compiled regex that whose first group matches the prefix and
243                     second group matches the index
244     :param values:  sequence of string names that may or may not match the pattern
245     :param name:    Type of value being checked (ex: IP Parameters). This will
246                     be included in the error messages.
247     :return:        List of error messages, empty list if no violations found
248     """
249     if not hasattr(pattern, "match"):
250         raise RuntimeError("Pattern must be a compiled regex")
251
252     prefix_indices = defaultdict(set)
253     for value in values:
254         m = pattern.match(value)
255         if m:
256             prefix_indices[m.group(1)].add(int(m.group(2)))
257
258     invalid_params = []
259     for prefix, indices in prefix_indices.items():
260         indices = sorted(indices)
261         if indices[0] != 0:
262             invalid_params.append(
263                 "{} with prefix {} do not start at 0".format(value_type, prefix)
264             )
265         elif len(indices) - 1 != indices[-1]:
266             invalid_params.append(
267                 (
268                     "Index values of {} with prefix {} do not " + "increment by 1: {}"
269                 ).format(value_type, prefix, indices)
270             )
271     return invalid_params
272
273
274 def get_base_template_from_yaml_files(yaml_files):
275     """Return first filepath to match RE_BASE
276     """
277     for filepath in yaml_files:
278         basename = get_base_template_from_yaml_file(filepath)
279         if basename:
280             return basename
281     return None
282
283
284 def get_base_template_from_yaml_file(yaml_file):
285     (dirname, filename) = os.path.split(yaml_file)
286     files = os.listdir(dirname)
287     for file in files:
288         basename, __ = os.path.splitext(os.path.basename(file))
289         if (
290             (__ == ".yaml" or __ == ".yml")
291             and RE_BASE.search(basename)
292             and basename.find("volume") == -1
293         ):
294             return os.path.join(dirname, "{}{}".format(basename, __))
295     return None
296
297
298 def parameter_type_to_heat_type(parameter):
299     # getting parameter format
300     if isinstance(parameter, list):
301         parameter_type = "comma_delimited_list"
302     elif isinstance(parameter, str):
303         parameter_type = "string"
304     elif isinstance(parameter, dict):
305         parameter_type = "json"
306     elif isinstance(parameter, int):
307         parameter_type = "number"
308     elif isinstance(parameter, float):
309         parameter_type = "number"
310     elif isinstance(parameter, bool):
311         parameter_type = "boolean"
312     else:
313         parameter_type = None
314
315     return parameter_type
316
317
318 def prop_iterator(resource, *props):
319     terminators = ["get_resource", "get_attr", "str_replace", "get_param"]
320     if "properties" in resource:
321         resource = resource.get("properties")
322     props = list(props)
323
324     if isinstance(resource, dict) and any(x for x in terminators if x in resource):
325         yield resource
326     else:
327         prop = resource.get(props.pop(0))
328         if isinstance(prop, list):
329             for x in prop:
330                 yield from prop_iterator(x, *props)
331         elif isinstance(prop, dict):
332             yield from prop_iterator(prop, *props)
333
334
335 def get_param(property_value):
336     """
337     Returns the first parameter name from a get_param or None if get_param is
338     not used
339     """
340     if property_value and isinstance(property_value, dict):
341         param = property_value.get("get_param")
342         if param and isinstance(param, list) and len(param) > 0:
343             return param[0]
344         else:
345             return param
346     return None
347
348
349 def get_output_dir(config):
350     """
351     Retrieve the output directory for the reports and create it if necessary
352     :param config: pytest configuration
353     :return: output directory as string
354     """
355     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
356     if not os.path.exists(output_dir):
357         os.makedirs(output_dir, exist_ok=True)
358     return output_dir