[VVP] Support pluggable data sources for preload data
[vvp/validation-scripts.git] / ice_validator / preload / environment.py
1 import re
2 import tempfile
3 from pathlib import Path
4 from typing import Any, Optional, Mapping
5
6 from cached_property import cached_property
7
8 from preload.data import AbstractPreloadInstance, AbstractPreloadDataSource
9 from preload.model import VnfModule
10 from tests.helpers import check, first, unzip, load_yaml
11
12 SERVICE_TEMPLATE_PATTERN = re.compile(r".*service-.*?-template.yml")
13 RESOURCE_TEMPLATE_PATTERN = re.compile(r".*resource-(.*?)-template.yml")
14
15 ZONE_PARAMS = ("availability_zone_0", "availability_zone_1", "availability_zone_2")
16
17
18 def yaml_files(path):
19     """
20     Return files that are YAML (end with .yml or .yaml)
21
22     :param path: Directory path object
23     :return: list of paths to YAML files
24     """
25     return [
26         p
27         for p in path.iterdir()
28         if p.is_file() and p.suffix.lower() in (".yml", ".yaml")
29     ]
30
31
32 class CloudServiceArchive:
33     """
34     Wrapper to extract information from a CSAR file.
35     """
36
37     def __init__(self, csar_path):
38         self.csar_path = Path(csar_path)
39         with tempfile.TemporaryDirectory() as csar_dir:
40             csar_dir = Path(csar_dir)
41             unzip(self.csar_path, csar_dir)
42             self._service = self._get_service_template(csar_dir)
43             self._resources = self._get_vf_module_resource_templates(csar_dir)
44
45     def get_vf_module(self, vf_module):
46         """
47         Retrieve the VF Module definition from the CSAR for the given heat
48         module name (should not include the file extension - ex: base)
49
50         :param vf_module: name of Heat module (no path or file extension)
51         :return: The definition of the module as a dict or None if not found
52         """
53         groups = self._service.get("topology_template", {}).get("groups", {})
54         for props in groups.values():
55             module_label = props.get("properties", {}).get("vf_module_label", "")
56             if module_label.lower() == vf_module.lower():
57                 return props
58         return None
59
60     def get_vf_module_model_name(self, vf_module):
61         """
62         Retrieves the vfModuleModelName of the module or None if vf_module is not
63         found (see get_vf_module)
64
65         :param vf_module: name of Heat module (no path or file extension)
66         :return: The value if vfModuleModelName as string or None if not found
67         """
68         module = self.get_vf_module(vf_module)
69         return module.get("metadata", {}).get("vfModuleModelName") if module else None
70
71     @property
72     def topology_template(self):
73         """
74         Return dict representing the topology_template node of the service
75         template
76         """
77         return self._service.get("topology_template") or {}
78
79     @property
80     def groups(self):
81         """
82         Return dict representing the groups node of the service
83         template
84         """
85         return self.topology_template.get("groups") or {}
86
87     @property
88     def vf_modules(self):
89         """
90         Returns mapping of group ID to VfModule present in the service template
91         """
92         return {
93             group_id: props
94             for group_id, props in self.groups.items()
95             if props.get("type") == "org.openecomp.groups.VfModule"
96         }
97
98     def get_vnf_type(self, module):
99         """
100         Concatenation of service and VF instance name
101         """
102         service_name = self.service_name
103         instance_name = self.get_vf_module_resource_name(module)
104         if service_name and instance_name:
105             return "{}/{}".format(service_name, instance_name)
106
107     @property
108     def vf_module_resource_names(self):
109         """
110         Returns the resource names for all VfModules (these can be used
111         to find the resource templates as they will be part of the filename)
112         """
113         names = (
114             module.get("metadata", {}).get("vfModuleModelName")
115             for module in self.vf_modules.values()
116         )
117         return [name.split(".")[0] for name in names if name]
118
119     def get_vf_module_resource_name(self, vf_module):
120         """
121         Retrieves the resource name of the module or None if vf_module is not
122         found (see get_vf_module)
123
124         :param vf_module: name of Heat module (no path or file extension)
125         :return: The value if resource nae as string or None if not found
126         """
127         vf_model_name = self.get_vf_module_model_name(vf_module)
128         if not vf_model_name:
129             return None
130         resource_name = vf_model_name.split(".")[0]
131         resource = self._resources.get(resource_name, {})
132         return resource.get("metadata", {}).get("name")
133
134     @staticmethod
135     def _get_definition_files(csar_dir):
136         """
137         Returns a list of all files in the CSAR's Definitions directory
138         """
139         def_dir = csar_dir / "Definitions"
140         check(
141             def_dir.exists(),
142             "CSAR is invalid. {} does not contain a Definitions directory.".format(
143                 csar_dir.as_posix()
144             ),
145         )
146         return yaml_files(def_dir)
147
148     def _get_service_template(self, csar_dir):
149         """
150         Returns the service template as a dict.  Assumes there is only one.
151         """
152         files = map(str, self._get_definition_files(csar_dir))
153         service_template = first(files, SERVICE_TEMPLATE_PATTERN.match)
154         return load_yaml(service_template) if service_template else {}
155
156     def _get_vf_module_resource_templates(self, csar_dir):
157         """
158         Returns a mapping of resource name to resource definition (as a dict)
159         (Only loads resource templates that correspond to VF Modules
160         """
161         def_dir = csar_dir / "Definitions"
162         mapping = (
163             (name, def_dir / "resource-{}-template.yml".format(name))
164             for name in self.vf_module_resource_names
165         )
166         return {name: load_yaml(path) for name, path in mapping if path.exists()}
167
168     @property
169     def service_name(self):
170         """
171         Name of the service (extracted from the service template
172         """
173         return self._service.get("metadata", {}).get("name")
174
175     def __repr__(self):
176         return "CSAR (path={}, name={})".format(self.csar_path.name, self.service_name)
177
178     def __str__(self):
179         return repr(self)
180
181
182 class PreloadEnvironment:
183     def __init__(self, env_dir, parent=None):
184         self.base_dir = Path(env_dir)
185         self.parent = parent
186         self._modules = self._load_modules()
187         self._sub_env = self._load_envs()
188         self._defaults = self._load_defaults()
189
190     def _load_defaults(self):
191         defaults = self.base_dir / "defaults.yaml"
192         return load_yaml(defaults) if defaults.exists() else {}
193
194     def _load_modules(self):
195         files = [
196             p
197             for p in self.base_dir.iterdir()
198             if p.is_file() and p.suffix.lower().endswith(".env")
199         ]
200         return {f.name.lower(): load_yaml(f).get("parameters", {}) for f in files}
201
202     def _load_envs(self):
203         env_dirs = [
204             p for p in self.base_dir.iterdir() if p.is_dir() and p.name != "preloads"
205         ]
206         return {d.name: PreloadEnvironment(d, self) for d in env_dirs}
207
208     @cached_property
209     def csar(self):
210         csar_path = first(self.base_dir.iterdir(), lambda p: p.suffix == ".csar")
211         if csar_path:
212             return CloudServiceArchive(csar_path)
213         else:
214             return self.parent.csar if self.parent else None
215
216     @property
217     def defaults(self):
218         result = {}
219         if self.parent:
220             result.update(self.parent.defaults)
221         result.update(self._defaults)
222         return result
223
224     @property
225     def environments(self):
226         all_envs = [self]
227         for env in self._sub_env.values():
228             all_envs.append(env)
229             all_envs.extend(env.environments)
230         return [e for e in all_envs if e.is_leaf]
231
232     def get_module(self, name):
233         name = name if name.lower().endswith(".env") else "{}.env".format(name).lower()
234         if name not in self.module_names:
235             return {}
236         result = {}
237         parent_module = self.parent.get_module(name) if self.parent else None
238         module = self._modules.get(name)
239         for m in (parent_module, self.defaults, module):
240             if m:
241                 result.update(m)
242         if self.csar:
243             vnf_type = self.csar.get_vnf_type(name)
244             if vnf_type:
245                 result["vnf-type"] = vnf_type
246             model_name = self.csar.get_vf_module_model_name(name)
247             if model_name:
248                 result["vf-module-model-name"] = model_name
249         return result
250
251     @property
252     def module_names(self):
253         parent_modules = self.parent.module_names if self.parent else set()
254         result = set()
255         result.update(self._modules.keys())
256         result.update(parent_modules)
257         return result
258
259     @property
260     def modules(self):
261         return {name: self.get_module(name) for name in self.module_names}
262
263     def get_environment(self, env_name):
264         for name, env in self._sub_env.items():
265             if name == env_name:
266                 return env
267             result = env.get_environment(env_name)
268             if result:
269                 return result
270         return None
271
272     @property
273     def is_base(self):
274         return self.parent is None
275
276     @property
277     def is_leaf(self):
278         return not self._sub_env
279
280     @property
281     def name(self):
282         return self.base_dir.name
283
284     def __repr__(self):
285         return "PreloadEnvironment(name={})".format(self.name)
286
287
288 class EnvironmentFilePreloadInstance(AbstractPreloadInstance):
289
290     def __init__(self, env: PreloadEnvironment, module_label: str, module_params: dict):
291         self.module_params = module_params
292         self._module_label = module_label
293         self.env = env
294         self.env_cache = {}
295
296     @property
297     def flag_incompletes(self) -> bool:
298         return True
299
300     @property
301     def preload_basename(self) -> str:
302         return self.module_label
303
304     @property
305     def output_dir(self) -> Path:
306         return self.env.base_dir.joinpath("preloads")
307
308     @property
309     def module_label(self) -> str:
310         return self._module_label
311
312     @property
313     def vf_module_name(self) -> str:
314         return self.get_param("vf_module_name")
315
316     @property
317     def vnf_name(self) -> Optional[str]:
318         return self.get_param("vnf_name")
319
320     @property
321     def vnf_type(self) -> Optional[str]:
322         return self.get_param("vnf-type")
323
324     @property
325     def vf_module_model_name(self) -> Optional[str]:
326         return self.get_param("vf-module-model-name")
327
328     def get_availability_zone(self, index: int, param_name: str) -> Optional[str]:
329         return self.get_param(param_name)
330
331     def get_network_name(self, network_role: str, name_param: str) -> Optional[str]:
332         return self.get_param(name_param)
333
334     def get_subnet_id(
335         self, network_role: str, ip_version: int, param_name: str
336     ) -> Optional[str]:
337         return self.get_param(param_name)
338
339     def get_subnet_name(
340         self, network_role: str, ip_version: int, param_name: str
341     ) -> Optional[str]:
342         # Not supported with env files
343         return None
344
345     def get_vm_name(self, vm_type: str, index: int, param_name: str) -> Optional[str]:
346         return self.get_param(param_name, single=True)
347
348     def get_floating_ip(
349         self, vm_type: str, network_role: str, ip_version: int, param_name: str
350     ) -> Optional[str]:
351         return self.get_param(param_name)
352
353     def get_fixed_ip(
354         self, vm_type: str, network_role: str, ip_version: int, index: int, param: str
355     ) -> Optional[str]:
356         return self.get_param(param, single=True)
357
358     def get_vnf_parameter(self, key: str, value: Any) -> Optional[str]:
359         module_value = self.get_param(key)
360         return module_value or value
361
362     def get_additional_parameters(self) -> Mapping[str, Any]:
363         return {}
364
365     def get_param(self, param_name, single=False):
366         """
367         Retrieves the value for the given param if it exists. If requesting a
368         single item, and the parameter is tied to a list then only one item from
369         the list will be returned.  For each subsequent call with the same parameter
370         it will iterate/rotate through the values in that list.  If single is False
371         then the full list will be returned.
372
373         :param param_name:  name of the parameter
374         :param single:      If True returns single value from lists otherwises the full
375                             list.  This has no effect on non-list values
376         """
377         value = self.env_cache.get(param_name)
378         if not value:
379             value = self.module_params.get(param_name)
380             if isinstance(value, list):
381                 value = value.copy()
382                 value.reverse()
383             self.env_cache[param_name] = value
384
385         if value and single and isinstance(value, list):
386             result = value.pop()
387         else:
388             result = value
389         return result if result != "CHANGEME" else None
390
391
392 class EnvironmentFileDataSource(AbstractPreloadDataSource):
393
394     def __init__(self, path: Path):
395         super().__init__(path)
396         check(path.is_dir(), f"{path} must be an existing directory")
397         self.path = path
398         self.env = PreloadEnvironment(path)
399
400     @classmethod
401     def get_source_type(cls) -> str:
402         return "DIR"
403
404     @classmethod
405     def get_identifier(self) -> str:
406         return "envfiles"
407
408     @classmethod
409     def get_name(self) -> str:
410         return "Environment Files"
411
412     def get_module_preloads(self, module: VnfModule):
413         for env in self.env.environments:
414             module_params = env.get_module(module.label)
415             yield EnvironmentFilePreloadInstance(env, module.label, module_params)