2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
45 from collections import defaultdict
47 from boltons import funcutils
48 from tests import cached_yaml as yaml
50 __path__ = [os.path.dirname(os.path.abspath(__file__))]
51 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
52 RE_BASE = re.compile(r"(^base$)|(^base_)|(_base_)|(_base$)")
55 def is_base_module(template_path):
56 basename = os.path.basename(template_path).lower()
57 name, extension = os.path.splitext(basename)
58 is_yaml = extension in {".yml", ".yaml"}
59 return is_yaml and RE_BASE.search(name) and not name.endswith("_volume")
62 def check_basename_ending(template_type, basename):
64 return True/False if the template type is matching
69 elif template_type == "volume":
70 return basename.endswith("_volume")
72 return not basename.endswith("_volume")
75 def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
77 get the parsed yaml for a list of yaml files
79 sections = [] if sections is None else sections
81 for yaml_file in yaml_files:
83 with open(yaml_file) as fh:
85 except yaml.YAMLError as e:
86 # pylint: disable=superfluous-parens
87 print("Error in %s: %s" % (yaml_file, e))
94 parsed_yml_list.append(yml)
95 return parsed_yml_list
98 def validates(*requirement_ids):
99 """Decorator that tags the test function with one or more requirement IDs.
102 >>> @validates('R-12345', 'R-12346')
103 ... def test_something():
105 >>> assert test_something.requirement_ids == ['R-12345', 'R-12346']
107 # pylint: disable=missing-docstring
109 # NOTE: We use a utility here to ensure that function signatures are
110 # maintained because pytest inspects function signatures to inject
111 # fixtures. I experimented with a few options, but this is the only
112 # library that worked. Other libraries dynamically generated a
113 # function at run-time, and then lost the requirement_ids attribute
114 @funcutils.wraps(func)
115 def wrapper(*args, **kw):
116 return func(*args, **kw)
118 wrapper.requirement_ids = requirement_ids
121 decorator.requirement_ids = requirement_ids
125 def categories(*categories):
127 @funcutils.wraps(func)
128 def wrapper(*args, **kw):
129 return func(*args, **kw)
131 wrapper.categories = categories
134 decorator.categories = categories
138 def get_environment_pair(heat_template):
139 """Returns a yaml/env pair given a yaml file"""
140 base_dir, filename = os.path.split(heat_template)
141 basename = os.path.splitext(filename)[0]
142 env_template = os.path.join(base_dir, "{}.env".format(basename))
143 if os.path.exists(env_template):
144 with open(heat_template, "r") as fh:
146 with open(env_template, "r") as fh:
149 environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml}
150 return environment_pair
155 def find_environment_file(yaml_files):
157 Pass file and recursively step backwards until environment file is found
159 :param yaml_files: list or string, start at size 1 and grows recursively
160 :return: corresponding environment file for a file, or None
163 if isinstance(yaml_files, str):
164 yaml_files = [yaml_files]
166 yaml_file = yaml_files[-1]
167 filepath, filename = os.path.split(yaml_file)
169 environment_pair = get_environment_pair(yaml_file)
171 return environment_pair
173 for file in os.listdir(filepath):
174 fq_name = "{}/{}".format(filepath, file)
175 if fq_name.endswith("yaml") or fq_name.endswith("yml"):
176 if fq_name not in yaml_files:
177 with open(fq_name) as f:
179 resources = yml.get("resources", {})
180 for resource_id, resource in resources.items():
181 resource_type = resource.get("type", "")
182 if resource_type == "OS::Heat::ResourceGroup":
184 resource.get("properties", {})
185 .get("resource_def", {})
188 # found called nested file
189 if resource_type == filename:
190 yaml_files.append(fq_name)
191 environment_pair = find_environment_file(yaml_files)
193 return environment_pair
196 def load_yaml(yaml_file):
198 Load the YAML file at the given path. If the file has previously been
199 loaded, then a cached version will be returned.
201 :param yaml_file: path to the YAML file
202 :return: data structure loaded from the YAML file
204 with open(yaml_file) as fh:
208 def traverse(data, search_key, func, path=None):
210 Traverse the data structure provided via ``data`` looking for occurences
211 of ``search_key``. When ``search_key`` is found, the value associated
212 with that key is passed to ``func``
214 :param data: arbitrary data structure of dicts and lists
215 :param search_key: key field to search for
216 :param func: Callable object that takes two parameters:
217 * A list representing the path of keys to search_key
218 * The value associated with the search_key
220 path = [] if path is None else path
221 if isinstance(data, dict):
222 for key, value in data.items():
223 curr_path = path + [key]
224 if key == search_key:
225 func(curr_path, value)
226 traverse(value, search_key, func, curr_path)
227 elif isinstance(data, list):
229 curr_path = path + [value]
230 if isinstance(value, (dict, list)):
231 traverse(value, search_key, func, curr_path)
232 elif value == search_key:
233 func(curr_path, value)
236 def check_indices(pattern, values, value_type):
238 Checks that indices associated with the matched prefix start at 0 and
239 increment by 1. It returns a list of messages for any prefixes that
242 :param pattern: Compiled regex that whose first group matches the prefix and
243 second group matches the index
244 :param values: sequence of string names that may or may not match the pattern
245 :param name: Type of value being checked (ex: IP Parameters). This will
246 be included in the error messages.
247 :return: List of error messages, empty list if no violations found
249 if not hasattr(pattern, "match"):
250 raise RuntimeError("Pattern must be a compiled regex")
252 prefix_indices = defaultdict(set)
254 m = pattern.match(value)
256 prefix_indices[m.group(1)].add(int(m.group(2)))
259 for prefix, indices in prefix_indices.items():
260 indices = sorted(indices)
262 invalid_params.append(
263 "{} with prefix {} do not start at 0".format(value_type, prefix)
265 elif len(indices) - 1 != indices[-1]:
266 invalid_params.append(
268 "Index values of {} with prefix {} do not " + "increment by 1: {}"
269 ).format(value_type, prefix, indices)
271 return invalid_params
274 def get_base_template_from_yaml_files(yaml_files):
275 """Return first filepath to match RE_BASE
277 for filepath in yaml_files:
278 basename = get_base_template_from_yaml_file(filepath)
284 def get_base_template_from_yaml_file(yaml_file):
285 (dirname, filename) = os.path.split(yaml_file)
286 files = os.listdir(dirname)
288 basename, __ = os.path.splitext(os.path.basename(file))
290 (__ == ".yaml" or __ == ".yml")
291 and RE_BASE.search(basename)
292 and basename.find("volume") == -1
294 return os.path.join(dirname, "{}{}".format(basename, __))
298 def parameter_type_to_heat_type(parameter):
299 # getting parameter format
300 if isinstance(parameter, list):
301 parameter_type = "comma_delimited_list"
302 elif isinstance(parameter, str):
303 parameter_type = "string"
304 elif isinstance(parameter, dict):
305 parameter_type = "json"
306 elif isinstance(parameter, int):
307 parameter_type = "number"
308 elif isinstance(parameter, float):
309 parameter_type = "number"
310 elif isinstance(parameter, bool):
311 parameter_type = "boolean"
313 parameter_type = None
315 return parameter_type
318 def prop_iterator(resource, *props):
319 terminators = ["get_resource", "get_attr", "str_replace", "get_param"]
320 if "properties" in resource:
321 resource = resource.get("properties")
324 if isinstance(resource, dict) and any(x for x in terminators if x in resource):
327 prop = resource.get(props.pop(0))
328 if isinstance(prop, list):
330 yield from prop_iterator(x, *props)
331 elif isinstance(prop, dict):
332 yield from prop_iterator(prop, *props)
335 def get_param(property_value):
337 Returns the first parameter name from a get_param or None if get_param is
340 if property_value and isinstance(property_value, dict):
341 param = property_value.get("get_param")
342 if param and isinstance(param, list) and len(param) > 0:
349 def get_output_dir(config):
351 Retrieve the output directory for the reports and create it if necessary
352 :param config: pytest configuration
353 :return: output directory as string
355 output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
356 if not os.path.exists(output_dir):
357 os.makedirs(output_dir, exist_ok=True)