Aligned test with updated R-610030
[vvp/validation-scripts.git] / ice_validator / preload / environment.py
1 import os
2 import re
3 import tempfile
4 from pathlib import Path
5 from typing import Any, Optional, Mapping
6
7 from cached_property import cached_property
8
9 from preload.data import AbstractPreloadInstance, AbstractPreloadDataSource
10 from preload.model import VnfModule
11 from tests.helpers import check, first, unzip, load_yaml
12
13 SERVICE_TEMPLATE_PATTERN = re.compile(r".*service-.*?-template.yml")
14 RESOURCE_TEMPLATE_PATTERN = re.compile(r".*resource-(.*?)-template.yml")
15
16 ZONE_PARAMS = ("availability_zone_0", "availability_zone_1", "availability_zone_2")
17
18
19 def yaml_files(path):
20     """
21     Return files that are YAML (end with .yml or .yaml)
22
23     :param path: Directory path object
24     :return: list of paths to YAML files
25     """
26     return [
27         p
28         for p in path.iterdir()
29         if p.is_file() and p.suffix.lower() in (".yml", ".yaml")
30     ]
31
32
33 class CloudServiceArchive:
34     """
35     Wrapper to extract information from a CSAR file.
36     """
37
38     def __init__(self, csar_path):
39         self.csar_path = Path(csar_path)
40         with tempfile.TemporaryDirectory() as csar_dir:
41             csar_dir = Path(csar_dir)
42             unzip(self.csar_path, csar_dir)
43             self._service = self._get_service_template(csar_dir)
44             self._resources = self._get_vf_module_resource_templates(csar_dir)
45
46     def get_vf_module(self, vf_module):
47         """
48         Retrieve the VF Module definition from the CSAR for the given heat
49         module name (should not include the file extension - ex: base)
50
51         :param vf_module: name of Heat module (no path or file extension)
52         :return: The definition of the module as a dict or None if not found
53         """
54         if (
55             vf_module.endswith(".env")
56             or vf_module.endswith(".yaml")
57             or vf_module.endswith(".yml")
58         ):
59             vf_module = os.path.splitext(vf_module)[0]
60         groups = self._service.get("topology_template", {}).get("groups", {})
61         for props in groups.values():
62             module_label = props.get("properties", {}).get("vf_module_label", "")
63             if module_label.lower() == vf_module.lower():
64                 return props
65         return None
66
67     def get_vf_module_model_name(self, vf_module):
68         """
69         Retrieves the vfModuleModelName of the module or None if vf_module is not
70         found (see get_vf_module)
71
72         :param vf_module: name of Heat module (no path or file extension)
73         :return: The value if vfModuleModelName as string or None if not found
74         """
75         module = self.get_vf_module(vf_module)
76         return module.get("metadata", {}).get("vfModuleModelName") if module else None
77
78     @property
79     def topology_template(self):
80         """
81         Return dict representing the topology_template node of the service
82         template
83         """
84         return self._service.get("topology_template") or {}
85
86     @property
87     def groups(self):
88         """
89         Return dict representing the groups node of the service
90         template
91         """
92         return self.topology_template.get("groups") or {}
93
94     @property
95     def vf_modules(self):
96         """
97         Returns mapping of group ID to VfModule present in the service template
98         """
99         return {
100             group_id: props
101             for group_id, props in self.groups.items()
102             if props.get("type") == "org.openecomp.groups.VfModule"
103         }
104
105     def get_vnf_type(self, module):
106         """
107         Concatenation of service and VF instance name
108         """
109         service_name = self.service_name
110         instance_name = self.get_vf_module_resource_name(module)
111         if service_name and instance_name:
112             return "{}/{} 0".format(service_name, instance_name)
113
114     @property
115     def vf_module_resource_names(self):
116         """
117         Returns the resource names for all VfModules (these can be used
118         to find the resource templates as they will be part of the filename)
119         """
120         names = (
121             module.get("metadata", {}).get("vfModuleModelName")
122             for module in self.vf_modules.values()
123         )
124         return [name.split(".")[0] for name in names if name]
125
126     def get_vf_module_resource_name(self, vf_module):
127         """
128         Retrieves the resource name of the module or None if vf_module is not
129         found (see get_vf_module)
130
131         :param vf_module: name of Heat module (no path or file extension)
132         :return: The value if resource nae as string or None if not found
133         """
134         vf_model_name = self.get_vf_module_model_name(vf_module)
135         if not vf_model_name:
136             return None
137         resource_name = vf_model_name.split(".")[0]
138         resource = self._resources.get(resource_name, {})
139         return resource.get("metadata", {}).get("name")
140
141     @staticmethod
142     def _get_definition_files(csar_dir):
143         """
144         Returns a list of all files in the CSAR's Definitions directory
145         """
146         def_dir = csar_dir / "Definitions"
147         check(
148             def_dir.exists(),
149             "CSAR is invalid. {} does not contain a Definitions directory.".format(
150                 csar_dir.as_posix()
151             ),
152         )
153         return yaml_files(def_dir)
154
155     def _get_service_template(self, csar_dir):
156         """
157         Returns the service template as a dict.  Assumes there is only one.
158         """
159         files = map(str, self._get_definition_files(csar_dir))
160         service_template = first(files, SERVICE_TEMPLATE_PATTERN.match)
161         return load_yaml(service_template) if service_template else {}
162
163     def _get_vf_module_resource_templates(self, csar_dir):
164         """
165         Returns a mapping of resource name to resource definition (as a dict)
166         (Only loads resource templates that correspond to VF Modules
167         """
168         def_dir = csar_dir / "Definitions"
169         mapping = (
170             (name, def_dir / "resource-{}-template.yml".format(name))
171             for name in self.vf_module_resource_names
172         )
173         return {name: load_yaml(path) for name, path in mapping if path.exists()}
174
175     @property
176     def service_name(self):
177         """
178         Name of the service (extracted from the service template
179         """
180         return self._service.get("metadata", {}).get("name")
181
182     def __repr__(self):
183         return "CSAR (path={}, name={})".format(self.csar_path.name, self.service_name)
184
185     def __str__(self):
186         return repr(self)
187
188
189 class PreloadEnvironment:
190     def __init__(self, env_dir, parent=None):
191         self.base_dir = Path(env_dir)
192         self.parent = parent
193         self._modules = self._load_modules()
194         self._sub_env = self._load_envs()
195         self._defaults = self._load_defaults()
196
197     def _load_defaults(self):
198         defaults = self.base_dir / "defaults.yaml"
199         return load_yaml(defaults) if defaults.exists() else {}
200
201     def _load_modules(self):
202         files = [
203             p
204             for p in self.base_dir.iterdir()
205             if p.is_file() and p.suffix.lower().endswith(".env")
206         ]
207         return {f.name.lower(): load_yaml(f).get("parameters", {}) for f in files}
208
209     def _load_envs(self):
210         env_dirs = [
211             p for p in self.base_dir.iterdir() if p.is_dir() and p.name != "preloads"
212         ]
213         return {d.name: PreloadEnvironment(d, self) for d in env_dirs}
214
215     @cached_property
216     def csar(self):
217         csar_path = first(self.base_dir.iterdir(), lambda p: p.suffix == ".csar")
218         if csar_path:
219             return CloudServiceArchive(csar_path)
220         else:
221             return self.parent.csar if self.parent else None
222
223     @property
224     def defaults(self):
225         result = {}
226         if self.parent:
227             result.update(self.parent.defaults)
228         result.update(self._defaults)
229         return result
230
231     @property
232     def environments(self):
233         all_envs = [self]
234         for env in self._sub_env.values():
235             all_envs.append(env)
236             all_envs.extend(env.environments)
237         return [e for e in all_envs if e.is_leaf]
238
239     def get_module(self, name):
240         name = name if name.lower().endswith(".env") else "{}.env".format(name).lower()
241         if name not in self.module_names:
242             return {}
243         result = {}
244         parent_module = self.parent.get_module(name) if self.parent else None
245         module = self._modules.get(name)
246         for m in (parent_module, self.defaults, module):
247             if m:
248                 result.update(m)
249         if self.csar:
250             vnf_type = self.csar.get_vnf_type(name)
251             if vnf_type:
252                 result["vnf-type"] = vnf_type
253             model_name = self.csar.get_vf_module_model_name(name)
254             if model_name:
255                 result["vf-module-model-name"] = model_name
256         return result
257
258     @property
259     def module_names(self):
260         parent_modules = self.parent.module_names if self.parent else set()
261         result = set()
262         result.update(self._modules.keys())
263         result.update(parent_modules)
264         return result
265
266     @property
267     def modules(self):
268         return {name: self.get_module(name) for name in self.module_names}
269
270     def get_environment(self, env_name):
271         for name, env in self._sub_env.items():
272             if name == env_name:
273                 return env
274             result = env.get_environment(env_name)
275             if result:
276                 return result
277         return None
278
279     @property
280     def is_base(self):
281         return self.parent is None
282
283     @property
284     def is_leaf(self):
285         return not self._sub_env
286
287     @property
288     def name(self):
289         return self.base_dir.name
290
291     def __repr__(self):
292         return "PreloadEnvironment(name={})".format(self.name)
293
294
295 class EnvironmentFilePreloadInstance(AbstractPreloadInstance):
296     def __init__(self, env: PreloadEnvironment, module_label: str, module_params: dict):
297         self.module_params = module_params
298         self._module_label = module_label
299         self.env = env
300         self.env_cache = {}
301
302     @property
303     def flag_incompletes(self) -> bool:
304         return True
305
306     @property
307     def preload_basename(self) -> str:
308         return self.module_label
309
310     @property
311     def output_dir(self) -> Path:
312         return self.env.base_dir.joinpath("preloads")
313
314     @property
315     def module_label(self) -> str:
316         return self._module_label
317
318     @property
319     def vf_module_name(self) -> str:
320         return self.get_param("vf_module_name")
321
322     @property
323     def vnf_name(self) -> Optional[str]:
324         return self.get_param("vnf_name")
325
326     @property
327     def vnf_type(self) -> Optional[str]:
328         return self.get_param("vnf-type")
329
330     @property
331     def vf_module_model_name(self) -> Optional[str]:
332         return self.get_param("vf-module-model-name")
333
334     def get_availability_zone(self, index: int, param_name: str) -> Optional[str]:
335         return self.get_param(param_name)
336
337     def get_network_name(self, network_role: str, name_param: str) -> Optional[str]:
338         return self.get_param(name_param)
339
340     def get_subnet_id(
341         self, network_role: str, ip_version: int, param_name: str
342     ) -> Optional[str]:
343         return self.get_param(param_name)
344
345     def get_subnet_name(
346         self, network_role: str, ip_version: int, param_name: str
347     ) -> Optional[str]:
348         # Not supported with env files
349         return None
350
351     def get_vm_name(self, vm_type: str, index: int, param_name: str) -> Optional[str]:
352         return self.get_param(param_name, single=True)
353
354     def get_floating_ip(
355         self, vm_type: str, network_role: str, ip_version: int, param_name: str
356     ) -> Optional[str]:
357         return self.get_param(param_name)
358
359     def get_fixed_ip(
360         self, vm_type: str, network_role: str, ip_version: int, index: int, param: str
361     ) -> Optional[str]:
362         return self.get_param(param, single=True)
363
364     def get_vnf_parameter(self, key: str, value: Any) -> Optional[str]:
365         module_value = self.get_param(key)
366         return module_value or value
367
368     def get_additional_parameters(self) -> Mapping[str, Any]:
369         return {}
370
371     def get_param(self, param_name, single=False):
372         """
373         Retrieves the value for the given param if it exists. If requesting a
374         single item, and the parameter is tied to a list then only one item from
375         the list will be returned.  For each subsequent call with the same parameter
376         it will iterate/rotate through the values in that list.  If single is False
377         then the full list will be returned.
378
379         :param param_name:  name of the parameter
380         :param single:      If True returns single value from lists otherwises the full
381                             list.  This has no effect on non-list values
382         """
383         value = self.env_cache.get(param_name)
384         if not value:
385             value = self.module_params.get(param_name)
386             if isinstance(value, list):
387                 value = value.copy()
388                 value.reverse()
389             self.env_cache[param_name] = value
390
391         if value and single and isinstance(value, list):
392             result = value.pop()
393         else:
394             result = value
395         return result if result != "CHANGEME" else None
396
397
398 class EnvironmentFileDataSource(AbstractPreloadDataSource):
399     def __init__(self, path: Path):
400         super().__init__(path)
401         check(path.is_dir(), f"{path} must be an existing directory")
402         self.path = path
403         self.env = PreloadEnvironment(path)
404
405     @classmethod
406     def get_source_type(cls) -> str:
407         return "DIR"
408
409     @classmethod
410     def get_identifier(self) -> str:
411         return "envfiles"
412
413     @classmethod
414     def get_name(self) -> str:
415         return "Environment Files"
416
417     def get_module_preloads(self, module: VnfModule):
418         for env in self.env.environments:
419             module_params = env.get_module(module.label)
420             yield EnvironmentFilePreloadInstance(env, module.label, module_params)