2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
38 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
45 from collections import defaultdict
47 from boltons import funcutils
48 from tests import cached_yaml as yaml
53 def check_basename_ending(template_type, basename):
55 return True/False if the template type is matching
60 elif template_type == "volume":
61 return basename.endswith("_volume")
63 return not basename.endswith("_volume")
66 def get_parsed_yml_for_yaml_files(yaml_files, sections=None):
68 get the parsed yaml for a list of yaml files
70 sections = [] if sections is None else sections
72 for yaml_file in yaml_files:
74 with open(yaml_file) as fh:
76 except yaml.YAMLError as e:
77 # pylint: disable=superfluous-parens
78 print("Error in %s: %s" % (yaml_file, e))
85 parsed_yml_list.append(yml)
86 return parsed_yml_list
89 def validates(*requirement_ids):
90 """Decorator that tags the test function with one or more requirement IDs.
93 >>> @validates('R-12345', 'R-12346')
94 ... def test_something():
96 >>> assert test_something.requirement_ids == ['R-12345', 'R-12346']
98 # pylint: disable=missing-docstring
100 # NOTE: We use a utility here to ensure that function signatures are
101 # maintained because pytest inspects function signatures to inject
102 # fixtures. I experimented with a few options, but this is the only
103 # library that worked. Other libraries dynamically generated a
104 # function at run-time, and then lost the requirement_ids attribute
105 @funcutils.wraps(func)
106 def wrapper(*args, **kw):
107 return func(*args, **kw)
109 wrapper.requirement_ids = requirement_ids
112 decorator.requirement_ids = requirement_ids
116 def categories(*categories):
118 @funcutils.wraps(func)
119 def wrapper(*args, **kw):
120 return func(*args, **kw)
122 wrapper.categories = categories
125 decorator.categories = categories
129 def get_environment_pair(heat_template):
130 """Returns a yaml/env pair given a yaml file"""
131 base_dir, filename = os.path.split(heat_template)
132 basename = os.path.splitext(filename)[0]
133 env_template = os.path.join(base_dir, "{}.env".format(basename))
134 if os.path.exists(env_template):
135 with open(heat_template, "r") as fh:
137 with open(env_template, "r") as fh:
140 environment_pair = {"name": basename, "yyml": yyml, "eyml": eyml}
141 return environment_pair
146 def find_environment_file(yaml_files):
148 Pass file and recursively step backwards until environment file is found
150 :param yaml_files: list or string, start at size 1 and grows recursively
151 :return: corresponding environment file for a file, or None
154 if isinstance(yaml_files, str):
155 yaml_files = [yaml_files]
157 yaml_file = yaml_files[-1]
158 filepath, filename = os.path.split(yaml_file)
160 environment_pair = get_environment_pair(yaml_file)
162 return environment_pair
164 for file in os.listdir(filepath):
165 fq_name = "{}/{}".format(filepath, file)
166 if fq_name.endswith("yaml") or fq_name.endswith("yml"):
167 if fq_name not in yaml_files:
168 with open(fq_name) as f:
170 resources = yml.get("resources", {})
171 for resource_id, resource in resources.items():
172 resource_type = resource.get("type", "")
173 if resource_type == "OS::Heat::ResourceGroup":
175 resource.get("properties", {})
176 .get("resource_def", {})
179 # found called nested file
180 if resource_type == filename:
181 yaml_files.append(fq_name)
182 environment_pair = find_environment_file(yaml_files)
184 return environment_pair
187 def load_yaml(yaml_file):
189 Load the YAML file at the given path. If the file has previously been
190 loaded, then a cached version will be returned.
192 :param yaml_file: path to the YAML file
193 :return: data structure loaded from the YAML file
195 with open(yaml_file) as fh:
199 def traverse(data, search_key, func, path=None):
201 Traverse the data structure provided via ``data`` looking for occurences
202 of ``search_key``. When ``search_key`` is found, the value associated
203 with that key is passed to ``func``
205 :param data: arbitrary data structure of dicts and lists
206 :param search_key: key field to search for
207 :param func: Callable object that takes two parameters:
208 * A list representing the path of keys to search_key
209 * The value associated with the search_key
211 path = [] if path is None else path
212 if isinstance(data, dict):
213 for key, value in data.items():
214 curr_path = path + [key]
215 if key == search_key:
216 func(curr_path, value)
217 traverse(value, search_key, func, curr_path)
218 elif isinstance(data, list):
220 curr_path = path + [value]
221 if isinstance(value, dict):
222 traverse(value, search_key, func, curr_path)
223 elif value == search_key:
224 func(curr_path, value)
227 def check_indices(pattern, values, value_type):
229 Checks that indices associated with the matched prefix start at 0 and
230 increment by 1. It returns a list of messages for any prefixes that
233 :param pattern: Compiled regex that whose first group matches the prefix and
234 second group matches the index
235 :param values: sequence of string names that may or may not match the pattern
236 :param name: Type of value being checked (ex: IP Parameters). This will
237 be included in the error messages.
238 :return: List of error messages, empty list if no violations found
240 if not hasattr(pattern, "match"):
241 raise RuntimeError("Pattern must be a compiled regex")
243 prefix_indices = defaultdict(set)
245 m = pattern.match(value)
247 prefix_indices[m.group(1)].add(int(m.group(2)))
250 for prefix, indices in prefix_indices.items():
251 indices = sorted(indices)
253 invalid_params.append(
254 "{} with prefix {} do not start at 0".format(value_type, prefix)
256 elif len(indices) - 1 != indices[-1]:
257 invalid_params.append(
259 "Index values of {} with prefix {} do not " + "increment by 1: {}"
260 ).format(value_type, prefix, indices)
262 return invalid_params