added Is_nova_server method to helpers
[vvp/validation-scripts.git] / ice_validator / tests / helpers.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 #
39
40 """Helpers
41 """
42
43 import os
44 import re
45 import zipfile
46 from collections import defaultdict
47 from typing import Set
48
49 from boltons import funcutils
50 from tests import cached_yaml as yaml
51
52 __path__ = [os.path.dirname(os.path.abspath(__file__))]
53 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
54 RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
55
56
57 def is_base_module(template_path):
58     basename = os.path.basename(template_path).lower()
59     name, extension = os.path.splitext(basename)
60     is_yaml = extension in {".yml", ".yaml"}
61     return is_yaml and RE_BASE.search(name) and not name.endswith("_volume")
62
63
64 def check_basename_ending(template_type, basename):
65     """
66     return True/False if the template type is matching
67     the filename
68     """
69     if not template_type:
70         return True
71     elif template_type == "volume":
72         return basename.endswith("_volume")
73     else:
74         return not basename.endswith("_volume")
75
76
77 def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
78     """
79     get the parsed yaml for a list of yaml files
80     """
81     sections = [] if sections is None else sections
82     parsed_yml_list = []
83     for yaml_file in yaml_files:
84         try:
85             with open(yaml_file) as fh:
86                 yml = yaml.load(fh)
87         except yaml.YAMLError as e:
88             # pylint: disable=superfluous-parens
89             print("Error in %s: %s" % (yaml_file, e))
90             continue
91         if yml:
92             if sections:
93                 for k in yml.keys():
94                     if k not in sections:
95                         del yml[k]
96             parsed_yml_list.append(yml)
97     return parsed_yml_list
98
99
100 def validates(*requirement_ids):
101     """Decorator that tags the test function with one or more requirement IDs.
102
103     Example:
104         >>> @validates('R-12345', 'R-12346')
105         ... def test_something():
106         ...     pass
107         >>> assert test_something.requirement_ids == ['R-12345', 'R-12346']
108     """
109     # pylint: disable=missing-docstring
110     def decorator(func):
111         # NOTE: We use a utility here to ensure that function signatures are
112         # maintained because pytest inspects function signatures to inject
113         # fixtures.  I experimented with a few options, but this is the only
114         # library that worked. Other libraries dynamically generated a
115         # function at run-time, and then lost the requirement_ids attribute
116         @funcutils.wraps(func)
117         def wrapper(*args, **kw):
118             return func(*args, **kw)
119
120         wrapper.requirement_ids = requirement_ids
121         return wrapper
122
123     decorator.requirement_ids = requirement_ids
124     return decorator
125
126
127 def categories(*categories):
128     def decorator(func):
129         @funcutils.wraps(func)
130         def wrapper(*args, **kw):
131             return func(*args, **kw)
132
133         wrapper.categories = categories
134         return wrapper
135
136     decorator.categories = categories
137     return decorator
138
139
140 def get_environment_pair(heat_template):
141     """Returns a yaml/env pair given a yaml file"""
142     base_dir, filename = os.path.split(heat_template)
143     basename = os.path.splitext(filename)[0]
144     env_template = os.path.join(base_dir, "{}.env".format(basename))
145     if os.path.exists(env_template):
146         with open(heat_template, "r") as fh:
147             yyml = yaml.load(fh)
148         with open(env_template, "r") as fh:
149             eyml = yaml.load(fh)
150
151         environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml}
152         return environment_pair
153
154     return None
155
156
157 def find_environment_file(yaml_files):
158     """
159     Pass file and recursively step backwards until environment file is found
160
161     :param yaml_files: list or string, start at size 1 and grows recursively
162     :return: corresponding environment file for a file, or None
163     """
164     # sanitize
165     if isinstance(yaml_files, str):
166         yaml_files = [yaml_files]
167
168     yaml_file = yaml_files[-1]
169     filepath, filename = os.path.split(yaml_file)
170
171     environment_pair = get_environment_pair(yaml_file)
172     if environment_pair:
173         return environment_pair
174
175     for file in os.listdir(filepath):
176         fq_name = "{}/{}".format(filepath, file)
177         if fq_name.endswith("yaml") or fq_name.endswith("yml"):
178             if fq_name not in yaml_files:
179                 with open(fq_name) as f:
180                     yml = yaml.load(f)
181                 resources = yml.get("resources", {})
182                 for resource_id, resource in resources.items():
183                     resource_type = resource.get("type", "")
184                     if resource_type == "OS::Heat::ResourceGroup":
185                         resource_type = (
186                             resource.get("properties", {})
187                             .get("resource_def", {})
188                             .get("type", "")
189                         )
190                     # found called nested file
191                     if resource_type == filename:
192                         yaml_files.append(fq_name)
193                         environment_pair = find_environment_file(yaml_files)
194
195     return environment_pair
196
197
198 def load_yaml(yaml_file):
199     """
200     Load the YAML file at the given path.  If the file has previously been
201     loaded, then a cached version will be returned.
202
203     :param yaml_file: path to the YAML file
204     :return: data structure loaded from the YAML file
205     """
206     with open(yaml_file) as fh:
207         return yaml.load(fh)
208
209
210 def traverse(data, search_key, func, path=None):
211     """
212     Traverse the data structure provided via ``data`` looking for occurences
213     of ``search_key``.  When ``search_key`` is found, the value associated
214     with that key is passed to ``func``
215
216     :param data:        arbitrary data structure of dicts and lists
217     :param search_key:  key field to search for
218     :param func:        Callable object that takes two parameters:
219                         * A list representing the path of keys to search_key
220                         * The value associated with the search_key
221     """
222     path = [] if path is None else path
223     if isinstance(data, dict):
224         for key, value in data.items():
225             curr_path = path + [key]
226             if key == search_key:
227                 func(curr_path, value)
228             traverse(value, search_key, func, curr_path)
229     elif isinstance(data, list):
230         for value in data:
231             curr_path = path + [value]
232             if isinstance(value, (dict, list)):
233                 traverse(value, search_key, func, curr_path)
234             elif value == search_key:
235                 func(curr_path, value)
236
237
238 def check_indices(pattern, values, value_type):
239     """
240     Checks that indices associated with the matched prefix start at 0 and
241     increment by 1.  It returns a list of messages for any prefixes that
242     violate the rules.
243
244     :param pattern: Compiled regex that whose first group matches the prefix and
245                     second group matches the index
246     :param values:  sequence of string names that may or may not match the pattern
247     :param name:    Type of value being checked (ex: IP Parameters). This will
248                     be included in the error messages.
249     :return:        List of error messages, empty list if no violations found
250     """
251     if not hasattr(pattern, "match"):
252         raise RuntimeError("Pattern must be a compiled regex")
253
254     prefix_indices = defaultdict(set)
255     for value in values:
256         m = pattern.match(value)
257         if m:
258             prefix_indices[m.group(1)].add(int(m.group(2)))
259
260     invalid_params = []
261     for prefix, indices in prefix_indices.items():
262         indices = sorted(indices)
263         if indices[0] != 0:
264             invalid_params.append(
265                 "{} with prefix {} do not start at 0".format(value_type, prefix)
266             )
267         elif len(indices) - 1 != indices[-1]:
268             invalid_params.append(
269                 (
270                     "Index values of {} with prefix {} do not " + "increment by 1: {}"
271                 ).format(value_type, prefix, indices)
272             )
273     return invalid_params
274
275
276 def get_base_template_from_yaml_files(yaml_files):
277     """Return first filepath to match RE_BASE
278     """
279     for filepath in yaml_files:
280         basename = get_base_template_from_yaml_file(filepath)
281         if basename:
282             return basename
283     return None
284
285
286 def get_base_template_from_yaml_file(yaml_file):
287     (dirname, filename) = os.path.split(yaml_file)
288     files = os.listdir(dirname)
289     for file in files:
290         basename, __ = os.path.splitext(os.path.basename(file))
291         if (
292             (__ == ".yaml" or __ == ".yml")
293             and RE_BASE.search(basename)
294             and basename.find("volume") == -1
295         ):
296             return os.path.join(dirname, "{}{}".format(basename, __))
297     return None
298
299
300 def parameter_type_to_heat_type(parameter):
301     # getting parameter format
302     if isinstance(parameter, list):
303         parameter_type = "comma_delimited_list"
304     elif isinstance(parameter, str):
305         parameter_type = "string"
306     elif isinstance(parameter, dict):
307         parameter_type = "json"
308     elif isinstance(parameter, int) or isinstance(parameter, float):
309         parameter_type = "number"
310     elif isinstance(parameter, bool):
311         parameter_type = "boolean"
312     else:
313         parameter_type = None
314
315     return parameter_type
316
317
318 def prop_iterator(resource, *props):
319     terminators = ["get_resource", "get_attr", "str_replace", "get_param"]
320     if "properties" in resource:
321         resource = resource.get("properties")
322     props = list(props)
323
324     if isinstance(resource, dict) and any(x for x in terminators if x in resource):
325         yield resource
326     else:
327         prop = resource.get(props.pop(0))
328         if isinstance(prop, list):
329             for x in prop:
330                 yield from prop_iterator(x, *props)
331         elif isinstance(prop, dict):
332             yield from prop_iterator(prop, *props)
333
334
335 def get_param(property_value):
336     """
337     Returns the first parameter name from a get_param or None if get_param is
338     not used
339     """
340     if property_value and isinstance(property_value, dict):
341         param = property_value.get("get_param")
342         if param and isinstance(param, list) and len(param) > 0:
343             return param[0]
344         else:
345             return param
346     return None
347
348
349 def get_output_dir(config):
350     """
351     Retrieve the output directory for the reports and create it if necessary
352     :param config: pytest configuration
353     :return: output directory as string
354     """
355     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
356     if not os.path.exists(output_dir):
357         os.makedirs(output_dir, exist_ok=True)
358     return output_dir
359
360
361 def first(seq, predicate, default=None):
362     """
363     Return the first item in sequence that satisfies the callable, predicate, or
364     returns the default if not found.
365
366     :param seq:         iterable sequence of objects
367     :param predicate:   callable that accepts one item from the sequence
368     :param default:     value to return if not found (default is None)
369     :return:            default value if no item satisfies the predicate
370     """
371     return next((i for i in seq if predicate(i)), default)
372
373
374 def check(predicate, message):
375     """
376     Raise a RuntimeError with the provided message if predicate is False.
377
378     Example:
379         check(path.is_file(), "{} must be a file".format(path.as_posix()))
380
381     :param predicate:   boolean condition
382     :param message:     message
383     """
384     if not predicate:
385         raise RuntimeError(message)
386
387
388 def unzip(zip_path, target_dir):
389     """
390     Extracts a Zip archive located at zip_path to target_dir (which will be
391     created if it already exists)
392
393     :param zip_path:    path to valid zip file
394     :param target_dir:  directory to unzip zip_path
395     """
396     check(zipfile.is_zipfile(zip_path), "{} is not a valid zipfile or does not exist".format(zip_path))
397     archive = zipfile.ZipFile(zip_path)
398     if not os.path.exists(target_dir):
399         os.makedirs(target_dir, exist_ok=True)
400     archive.extractall(path=target_dir)
401
402
403 def remove(sequence, exclude, key=None):
404     """
405     Remove a copy of sequence that items occur in exclude.
406
407     :param sequence: sequence of objects
408     :param exclude:  objects to excluded (must support ``in`` check)
409     :param key:      optional function to extract key from item in sequence
410     :return:         list of items not in the excluded
411     """
412     key_func = key if key else lambda x: x
413     result = (s for s in sequence if key_func(s) not in exclude)
414     return set(result) if isinstance(sequence, Set) else list(result)
415
416
417 def is_nova_server(resource):
418     """
419     checks resource is a nova server
420     """
421     return isinstance(resource, dict) and "type" in resource and "properties" in resource and resource.get("type") == "OS::Nova::Server"
422