[VVP] preload generation enhancements
[vvp/validation-scripts.git] / ice_validator / preload / environment.py
1 import os
2 import re
3 import tempfile
4 from pathlib import Path
5 from typing import Any, Optional, Mapping
6
7 from cached_property import cached_property
8
9 from preload.data import AbstractPreloadInstance, AbstractPreloadDataSource
10 from preload.model import VnfModule
11 from tests.helpers import check, first, unzip, load_yaml
12
13 SERVICE_TEMPLATE_PATTERN = re.compile(r".*service-.*?-template.yml")
14 RESOURCE_TEMPLATE_PATTERN = re.compile(r".*resource-(.*?)-template.yml")
15
16 ZONE_PARAMS = ("availability_zone_0", "availability_zone_1", "availability_zone_2")
17
18
19 def yaml_files(path):
20     """
21     Return files that are YAML (end with .yml or .yaml)
22
23     :param path: Directory path object
24     :return: list of paths to YAML files
25     """
26     return [
27         p
28         for p in path.iterdir()
29         if p.is_file() and p.suffix.lower() in (".yml", ".yaml")
30     ]
31
32
33 class CloudServiceArchive:
34     """
35     Wrapper to extract information from a CSAR file.
36     """
37
38     def __init__(self, csar_path):
39         self.csar_path = Path(csar_path)
40         with tempfile.TemporaryDirectory() as csar_dir:
41             csar_dir = Path(csar_dir)
42             unzip(self.csar_path, csar_dir)
43             self._service = self._get_service_template(csar_dir)
44             self._resources = self._get_vf_module_resource_templates(csar_dir)
45
46     def get_vf_module(self, vf_module):
47         """
48         Retrieve the VF Module definition from the CSAR for the given heat
49         module name (should not include the file extension - ex: base)
50
51         :param vf_module: name of Heat module (no path or file extension)
52         :return: The definition of the module as a dict or None if not found
53         """
54         if(
55             vf_module.endswith(".env")
56             or vf_module.endswith(".yaml")
57             or vf_module.endswith(".yml")
58         ):
59             vf_module = os.path.splitext(vf_module)[0]
60         groups = self._service.get("topology_template", {}).get("groups", {})
61         for props in groups.values():
62             module_label = props.get("properties", {}).get("vf_module_label", "")
63             if module_label.lower() == vf_module.lower():
64                 return props
65         return None
66
67     def get_vf_module_model_name(self, vf_module):
68         """
69         Retrieves the vfModuleModelName of the module or None if vf_module is not
70         found (see get_vf_module)
71
72         :param vf_module: name of Heat module (no path or file extension)
73         :return: The value if vfModuleModelName as string or None if not found
74         """
75         module = self.get_vf_module(vf_module)
76         return module.get("metadata", {}).get("vfModuleModelName") if module else None
77
78     @property
79     def topology_template(self):
80         """
81         Return dict representing the topology_template node of the service
82         template
83         """
84         return self._service.get("topology_template") or {}
85
86     @property
87     def groups(self):
88         """
89         Return dict representing the groups node of the service
90         template
91         """
92         return self.topology_template.get("groups") or {}
93
94     @property
95     def vf_modules(self):
96         """
97         Returns mapping of group ID to VfModule present in the service template
98         """
99         return {
100             group_id: props
101             for group_id, props in self.groups.items()
102             if props.get("type") == "org.openecomp.groups.VfModule"
103         }
104
105     def get_vnf_type(self, module):
106         """
107         Concatenation of service and VF instance name
108         """
109         service_name = self.service_name
110         instance_name = self.get_vf_module_resource_name(module)
111         if service_name and instance_name:
112             return "{}/{} 0".format(service_name, instance_name)
113
114     @property
115     def vf_module_resource_names(self):
116         """
117         Returns the resource names for all VfModules (these can be used
118         to find the resource templates as they will be part of the filename)
119         """
120         names = (
121             module.get("metadata", {}).get("vfModuleModelName")
122             for module in self.vf_modules.values()
123         )
124         return [name.split(".")[0] for name in names if name]
125
126     def get_vf_module_resource_name(self, vf_module):
127         """
128         Retrieves the resource name of the module or None if vf_module is not
129         found (see get_vf_module)
130
131         :param vf_module: name of Heat module (no path or file extension)
132         :return: The value if resource nae as string or None if not found
133         """
134         vf_model_name = self.get_vf_module_model_name(vf_module)
135         if not vf_model_name:
136             return None
137         resource_name = vf_model_name.split(".")[0]
138         resource = self._resources.get(resource_name, {})
139         return resource.get("metadata", {}).get("name")
140
141     @staticmethod
142     def _get_definition_files(csar_dir):
143         """
144         Returns a list of all files in the CSAR's Definitions directory
145         """
146         def_dir = csar_dir / "Definitions"
147         check(
148             def_dir.exists(),
149             "CSAR is invalid. {} does not contain a Definitions directory.".format(
150                 csar_dir.as_posix()
151             ),
152         )
153         return yaml_files(def_dir)
154
155     def _get_service_template(self, csar_dir):
156         """
157         Returns the service template as a dict.  Assumes there is only one.
158         """
159         files = map(str, self._get_definition_files(csar_dir))
160         service_template = first(files, SERVICE_TEMPLATE_PATTERN.match)
161         return load_yaml(service_template) if service_template else {}
162
163     def _get_vf_module_resource_templates(self, csar_dir):
164         """
165         Returns a mapping of resource name to resource definition (as a dict)
166         (Only loads resource templates that correspond to VF Modules
167         """
168         def_dir = csar_dir / "Definitions"
169         mapping = (
170             (name, def_dir / "resource-{}-template.yml".format(name))
171             for name in self.vf_module_resource_names
172         )
173         return {name: load_yaml(path) for name, path in mapping if path.exists()}
174
175     @property
176     def service_name(self):
177         """
178         Name of the service (extracted from the service template
179         """
180         return self._service.get("metadata", {}).get("name")
181
182     def __repr__(self):
183         return "CSAR (path={}, name={})".format(self.csar_path.name, self.service_name)
184
185     def __str__(self):
186         return repr(self)
187
188
189 class PreloadEnvironment:
190     def __init__(self, env_dir, parent=None):
191         self.base_dir = Path(env_dir)
192         self.parent = parent
193         self._modules = self._load_modules()
194         self._sub_env = self._load_envs()
195         self._defaults = self._load_defaults()
196
197     def _load_defaults(self):
198         defaults = self.base_dir / "defaults.yaml"
199         return load_yaml(defaults) if defaults.exists() else {}
200
201     def _load_modules(self):
202         files = [
203             p
204             for p in self.base_dir.iterdir()
205             if p.is_file() and p.suffix.lower().endswith(".env")
206         ]
207         return {f.name.lower(): load_yaml(f).get("parameters", {}) for f in files}
208
209     def _load_envs(self):
210         env_dirs = [
211             p for p in self.base_dir.iterdir() if p.is_dir() and p.name != "preloads"
212         ]
213         return {d.name: PreloadEnvironment(d, self) for d in env_dirs}
214
215     @cached_property
216     def csar(self):
217         csar_path = first(self.base_dir.iterdir(), lambda p: p.suffix == ".csar")
218         if csar_path:
219             return CloudServiceArchive(csar_path)
220         else:
221             return self.parent.csar if self.parent else None
222
223     @property
224     def defaults(self):
225         result = {}
226         if self.parent:
227             result.update(self.parent.defaults)
228         result.update(self._defaults)
229         return result
230
231     @property
232     def environments(self):
233         all_envs = [self]
234         for env in self._sub_env.values():
235             all_envs.append(env)
236             all_envs.extend(env.environments)
237         return [e for e in all_envs if e.is_leaf]
238
239     def get_module(self, name):
240         name = name if name.lower().endswith(".env") else "{}.env".format(name).lower()
241         if name not in self.module_names:
242             return {}
243         result = {}
244         parent_module = self.parent.get_module(name) if self.parent else None
245         module = self._modules.get(name)
246         for m in (parent_module, self.defaults, module):
247             if m:
248                 result.update(m)
249         if self.csar:
250             vnf_type = self.csar.get_vnf_type(name)
251             if vnf_type:
252                 result["vnf-type"] = vnf_type
253             model_name = self.csar.get_vf_module_model_name(name)
254             if model_name:
255                 result["vf-module-model-name"] = model_name
256         return result
257
258     @property
259     def module_names(self):
260         parent_modules = self.parent.module_names if self.parent else set()
261         result = set()
262         result.update(self._modules.keys())
263         result.update(parent_modules)
264         return result
265
266     @property
267     def modules(self):
268         return {name: self.get_module(name) for name in self.module_names}
269
270     def get_environment(self, env_name):
271         for name, env in self._sub_env.items():
272             if name == env_name:
273                 return env
274             result = env.get_environment(env_name)
275             if result:
276                 return result
277         return None
278
279     @property
280     def is_base(self):
281         return self.parent is None
282
283     @property
284     def is_leaf(self):
285         return not self._sub_env
286
287     @property
288     def name(self):
289         return self.base_dir.name
290
291     def __repr__(self):
292         return "PreloadEnvironment(name={})".format(self.name)
293
294
295 class EnvironmentFilePreloadInstance(AbstractPreloadInstance):
296
297     def __init__(self, env: PreloadEnvironment, module_label: str, module_params: dict):
298         self.module_params = module_params
299         self._module_label = module_label
300         self.env = env
301         self.env_cache = {}
302
303     @property
304     def flag_incompletes(self) -> bool:
305         return True
306
307     @property
308     def preload_basename(self) -> str:
309         return self.module_label
310
311     @property
312     def output_dir(self) -> Path:
313         return self.env.base_dir.joinpath("preloads")
314
315     @property
316     def module_label(self) -> str:
317         return self._module_label
318
319     @property
320     def vf_module_name(self) -> str:
321         return self.get_param("vf_module_name")
322
323     @property
324     def vnf_name(self) -> Optional[str]:
325         return self.get_param("vnf_name")
326
327     @property
328     def vnf_type(self) -> Optional[str]:
329         return self.get_param("vnf-type")
330
331     @property
332     def vf_module_model_name(self) -> Optional[str]:
333         return self.get_param("vf-module-model-name")
334
335     def get_availability_zone(self, index: int, param_name: str) -> Optional[str]:
336         return self.get_param(param_name)
337
338     def get_network_name(self, network_role: str, name_param: str) -> Optional[str]:
339         return self.get_param(name_param)
340
341     def get_subnet_id(
342         self, network_role: str, ip_version: int, param_name: str
343     ) -> Optional[str]:
344         return self.get_param(param_name)
345
346     def get_subnet_name(
347         self, network_role: str, ip_version: int, param_name: str
348     ) -> Optional[str]:
349         # Not supported with env files
350         return None
351
352     def get_vm_name(self, vm_type: str, index: int, param_name: str) -> Optional[str]:
353         return self.get_param(param_name, single=True)
354
355     def get_floating_ip(
356         self, vm_type: str, network_role: str, ip_version: int, param_name: str
357     ) -> Optional[str]:
358         return self.get_param(param_name)
359
360     def get_fixed_ip(
361         self, vm_type: str, network_role: str, ip_version: int, index: int, param: str
362     ) -> Optional[str]:
363         return self.get_param(param, single=True)
364
365     def get_vnf_parameter(self, key: str, value: Any) -> Optional[str]:
366         module_value = self.get_param(key)
367         return module_value or value
368
369     def get_additional_parameters(self) -> Mapping[str, Any]:
370         return {}
371
372     def get_param(self, param_name, single=False):
373         """
374         Retrieves the value for the given param if it exists. If requesting a
375         single item, and the parameter is tied to a list then only one item from
376         the list will be returned.  For each subsequent call with the same parameter
377         it will iterate/rotate through the values in that list.  If single is False
378         then the full list will be returned.
379
380         :param param_name:  name of the parameter
381         :param single:      If True returns single value from lists otherwises the full
382                             list.  This has no effect on non-list values
383         """
384         value = self.env_cache.get(param_name)
385         if not value:
386             value = self.module_params.get(param_name)
387             if isinstance(value, list):
388                 value = value.copy()
389                 value.reverse()
390             self.env_cache[param_name] = value
391
392         if value and single and isinstance(value, list):
393             result = value.pop()
394         else:
395             result = value
396         return result if result != "CHANGEME" else None
397
398
399 class EnvironmentFileDataSource(AbstractPreloadDataSource):
400
401     def __init__(self, path: Path):
402         super().__init__(path)
403         check(path.is_dir(), f"{path} must be an existing directory")
404         self.path = path
405         self.env = PreloadEnvironment(path)
406
407     @classmethod
408     def get_source_type(cls) -> str:
409         return "DIR"
410
411     @classmethod
412     def get_identifier(self) -> str:
413         return "envfiles"
414
415     @classmethod
416     def get_name(self) -> str:
417         return "Environment Files"
418
419     def get_module_preloads(self, module: VnfModule):
420         for env in self.env.environments:
421             module_params = env.get_module(module.label)
422             yield EnvironmentFilePreloadInstance(env, module.label, module_params)