2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
46 from collections import defaultdict
47 from typing import Set
49 from boltons import funcutils
50 from tests import cached_yaml as yaml
52 __path__ = [os.path.dirname(os.path.abspath(__file__))]
53 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
54 RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
56 INTRINSIC_FUNCTIONS = [
74 def is_base_module(template_path):
75 basename = os.path.basename(template_path).lower()
76 name, extension = os.path.splitext(basename)
77 is_yaml = extension in {".yml", ".yaml"}
78 return is_yaml and RE_BASE.search(name) and not name.endswith("_volume")
81 def check_basename_ending(template_type, basename):
83 return True/False if the template type is matching
88 elif template_type == "volume":
89 return basename.endswith("_volume")
91 return not basename.endswith("_volume")
94 def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
96 get the parsed yaml for a list of yaml files
98 sections = [] if sections is None else sections
100 for yaml_file in yaml_files:
102 with open(yaml_file) as fh:
104 except yaml.YAMLError as e:
105 # pylint: disable=superfluous-parens
106 print("Error in %s: %s" % (yaml_file, e))
111 if k not in sections:
113 parsed_yml_list.append(yml)
114 return parsed_yml_list
117 def validates(*requirement_ids):
118 """Decorator that tags the test function with one or more requirement IDs.
121 >>> @validates('R-12345', 'R-12346')
122 ... def test_something():
124 >>> assert test_something.requirement_ids == ['R-12345', 'R-12346']
126 # pylint: disable=missing-docstring
128 # NOTE: We use a utility here to ensure that function signatures are
129 # maintained because pytest inspects function signatures to inject
130 # fixtures. I experimented with a few options, but this is the only
131 # library that worked. Other libraries dynamically generated a
132 # function at run-time, and then lost the requirement_ids attribute
133 @funcutils.wraps(func)
134 def wrapper(*args, **kw):
135 return func(*args, **kw)
137 wrapper.requirement_ids = requirement_ids
140 decorator.requirement_ids = requirement_ids
144 def categories(*all_of, any_of=None):
145 any_of = set(any_of) if any_of else set()
146 all_of = set(all_of) if all_of else set()
149 @funcutils.wraps(func)
150 def wrapper(*args, **kw):
151 return func(*args, **kw)
153 wrapper.all_categories = all_of
154 wrapper.any_categories = any_of
157 decorator.all_categories = all_of
158 decorator.any_categories = any_of
162 def get_environment_pair(heat_template):
163 """Returns a yaml/env pair given a yaml file"""
164 base_dir, filename = os.path.split(heat_template)
165 basename = os.path.splitext(filename)[0]
166 env_template = os.path.join(base_dir, "{}.env".format(basename))
167 if os.path.exists(env_template):
168 with open(heat_template, "r") as fh:
170 with open(env_template, "r") as fh:
173 environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml}
174 return environment_pair
179 def find_environment_file(yaml_files):
181 Pass file and recursively step backwards until environment file is found
183 :param yaml_files: list or string, start at size 1 and grows recursively
184 :return: corresponding environment file for a file, or None
187 if isinstance(yaml_files, str):
188 yaml_files = [yaml_files]
190 yaml_file = yaml_files[-1]
191 filepath, filename = os.path.split(yaml_file)
193 environment_pair = get_environment_pair(yaml_file)
195 return environment_pair
197 for file in os.listdir(filepath):
198 fq_name = "{}/{}".format(filepath, file)
199 if fq_name.endswith("yaml") or fq_name.endswith("yml"):
200 if fq_name not in yaml_files:
201 with open(fq_name) as f:
203 resources = yml.get("resources", {})
204 for resource_id, resource in resources.items():
205 resource_type = resource.get("type", "")
206 if resource_type == "OS::Heat::ResourceGroup":
208 resource.get("properties", {})
209 .get("resource_def", {})
212 # found called nested file
213 if resource_type == filename:
214 yaml_files.append(fq_name)
215 environment_pair = find_environment_file(yaml_files)
217 return environment_pair
220 def load_yaml(yaml_file):
222 Load the YAML file at the given path. If the file has previously been
223 loaded, then a cached version will be returned.
225 :param yaml_file: path to the YAML file
226 :return: data structure loaded from the YAML file
228 with open(yaml_file) as fh:
232 def traverse(data, search_key, func, path=None):
234 Traverse the data structure provided via ``data`` looking for occurences
235 of ``search_key``. When ``search_key`` is found, the value associated
236 with that key is passed to ``func``
238 :param data: arbitrary data structure of dicts and lists
239 :param search_key: key field to search for
240 :param func: Callable object that takes two parameters:
241 * A list representing the path of keys to search_key
242 * The value associated with the search_key
244 path = [] if path is None else path
245 if isinstance(data, dict):
246 for key, value in data.items():
247 curr_path = path + [key]
248 if key == search_key:
249 func(curr_path, value)
250 traverse(value, search_key, func, curr_path)
251 elif isinstance(data, list):
253 curr_path = path + [value]
254 if isinstance(value, (dict, list)):
255 traverse(value, search_key, func, curr_path)
256 elif value == search_key:
257 func(curr_path, value)
258 elif search_key == data:
259 curr_path = path + [data]
260 func(curr_path, data)
263 def check_indices(pattern, values, value_type):
265 Checks that indices associated with the matched prefix start at 0 and
266 increment by 1. It returns a list of messages for any prefixes that
269 :param pattern: Compiled regex that whose first group matches the prefix and
270 second group matches the index
271 :param values: sequence of string names that may or may not match the pattern
272 :param name: Type of value being checked (ex: IP Parameters). This will
273 be included in the error messages.
274 :return: List of error messages, empty list if no violations found
276 if not hasattr(pattern, "match"):
277 raise RuntimeError("Pattern must be a compiled regex")
279 prefix_indices = defaultdict(set)
281 m = pattern.match(value)
283 prefix_indices[m.group(1)].add(int(m.group(2)))
286 for prefix, indices in prefix_indices.items():
287 indices = sorted(indices)
289 invalid_params.append(
290 "{} with prefix {} do not start at 0".format(value_type, prefix)
292 elif len(indices) - 1 != indices[-1]:
293 invalid_params.append(
295 "Index values of {} with prefix {} do not " + "increment by 1: {}"
296 ).format(value_type, prefix, indices)
298 return invalid_params
301 def get_base_template_from_yaml_files(yaml_files):
302 """Return first filepath to match RE_BASE
304 for filepath in yaml_files:
305 basename = get_base_template_from_yaml_file(filepath)
311 def get_base_template_from_yaml_file(yaml_file):
312 (dirname, filename) = os.path.split(yaml_file)
313 files = os.listdir(dirname)
315 basename, __ = os.path.splitext(os.path.basename(file))
317 (__ == ".yaml" or __ == ".yml")
318 and RE_BASE.search(basename)
319 and basename.find("volume") == -1
321 return os.path.join(dirname, "{}{}".format(basename, __))
325 def parameter_type_to_heat_type(parameter):
326 # getting parameter format
327 if isinstance(parameter, list):
328 parameter_type = "comma_delimited_list"
329 elif isinstance(parameter, str):
330 parameter_type = "string"
331 elif isinstance(parameter, dict):
332 parameter_type = "json"
333 elif isinstance(parameter, int) or isinstance(parameter, float):
334 parameter_type = "number"
335 elif isinstance(parameter, bool):
336 parameter_type = "boolean"
338 parameter_type = None
340 return parameter_type
343 def prop_iterator(resource, *props):
344 if "properties" in resource:
345 resource = resource.get("properties")
348 if isinstance(resource, dict) and any(
349 x for x in INTRINSIC_FUNCTIONS if x in resource
353 prop = resource.get(props.pop(0))
354 if isinstance(prop, list):
356 yield from prop_iterator(x, *props)
357 elif isinstance(prop, dict):
358 yield from prop_iterator(prop, *props)
361 def get_param(property_value):
363 Returns the first parameter name from a get_param or None if get_param is
366 if property_value and isinstance(property_value, dict):
367 param = property_value.get("get_param")
368 if param and isinstance(param, list) and len(param) > 0:
375 def get_output_dir(config):
377 Retrieve the output directory for the reports and create it if necessary
378 :param config: pytest configuration
379 :return: output directory as string
381 output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
382 if not os.path.exists(output_dir):
383 os.makedirs(output_dir, exist_ok=True)
387 def first(seq, predicate, default=None):
389 Return the first item in sequence that satisfies the callable, predicate, or
390 returns the default if not found.
392 :param seq: iterable sequence of objects
393 :param predicate: callable that accepts one item from the sequence
394 :param default: value to return if not found (default is None)
395 :return: default value if no item satisfies the predicate
397 return next((i for i in seq if predicate(i)), default)
400 def check(predicate, message):
402 Raise a RuntimeError with the provided message if predicate is False.
405 check(path.is_file(), "{} must be a file".format(path.as_posix()))
407 :param predicate: boolean condition
408 :param message: message
411 raise RuntimeError(message)
414 def unzip(zip_path, target_dir):
416 Extracts a Zip archive located at zip_path to target_dir (which will be
417 created if it already exists)
419 :param zip_path: path to valid zip file
420 :param target_dir: directory to unzip zip_path
423 zipfile.is_zipfile(zip_path),
424 "{} is not a valid zipfile or does not exist".format(zip_path),
426 archive = zipfile.ZipFile(zip_path)
427 if not os.path.exists(target_dir):
428 os.makedirs(target_dir, exist_ok=True)
429 archive.extractall(path=target_dir)
432 def remove(sequence, exclude, key=None):
434 Remove a copy of sequence that items occur in exclude.
436 :param sequence: sequence of objects
437 :param exclude: objects to excluded (must support ``in`` check)
438 :param key: optional function to extract key from item in sequence
439 :return: list of items not in the excluded
441 key_func = key if key else lambda x: x
442 result = (s for s in sequence if key_func(s) not in exclude)
443 return set(result) if isinstance(sequence, Set) else list(result)
446 def is_nova_server(resource):
448 checks resource is a nova server
451 isinstance(resource, dict)
452 and "type" in resource
453 and "properties" in resource
454 and resource.get("type") == "OS::Nova::Server"