[VVP] Preload Generation Enhancements and Fixes
[vvp/validation-scripts.git] / ice_validator / preload / environment.py
1 import re
2 import tempfile
3 from pathlib import Path
4
5 from cached_property import cached_property
6
7 from tests.helpers import check, first, unzip, load_yaml
8
9 SERVICE_TEMPLATE_PATTERN = re.compile(r".*service-.*?-template.yml")
10 RESOURCE_TEMPLATE_PATTERN = re.compile(r".*resource-(.*?)-template.yml")
11
12
13 def yaml_files(path):
14     """
15     Return files that are YAML (end with .yml or .yaml)
16
17     :param path: Directory path object
18     :return: list of paths to YAML files
19     """
20     return [
21         p
22         for p in path.iterdir()
23         if p.is_file() and p.suffix.lower() in (".yml", ".yaml")
24     ]
25
26
27 class CloudServiceArchive:
28     """
29     Wrapper to extract information from a CSAR file.
30     """
31
32     def __init__(self, csar_path):
33         self.csar_path = Path(csar_path)
34         with tempfile.TemporaryDirectory() as csar_dir:
35             csar_dir = Path(csar_dir)
36             unzip(self.csar_path, csar_dir)
37             self._service = self._get_service_template(csar_dir)
38             self._resources = self._get_vf_module_resource_templates(csar_dir)
39
40     def get_vf_module(self, vf_module):
41         """
42         Retrieve the VF Module definition from the CSAR for the given heat
43         module name (should not include the file extension - ex: base)
44
45         :param vf_module: name of Heat module (no path or file extension)
46         :return: The definition of the module as a dict or None if not found
47         """
48         groups = self._service.get("topology_template", {}).get("groups", {})
49         for props in groups.values():
50             module_label = props.get("properties", {}).get("vf_module_label", "")
51             if module_label.lower() == vf_module.lower():
52                 return props
53         return None
54
55     def get_vf_module_model_name(self, vf_module):
56         """
57         Retrieves the vfModuleModelName of the module or None if vf_module is not
58         found (see get_vf_module)
59
60         :param vf_module: name of Heat module (no path or file extension)
61         :return: The value if vfModuleModelName as string or None if not found
62         """
63         module = self.get_vf_module(vf_module)
64         return module.get("metadata", {}).get("vfModuleModelName") if module else None
65
66     @property
67     def topology_template(self):
68         """
69         Return dict representing the topology_template node of the service
70         template
71         """
72         return self._service.get("topology_template") or {}
73
74     @property
75     def groups(self):
76         """
77         Return dict representing the groups node of the service
78         template
79         """
80         return self.topology_template.get("groups") or {}
81
82     @property
83     def vf_modules(self):
84         """
85         Returns mapping of group ID to VfModule present in the service template
86         """
87         return {
88             group_id: props
89             for group_id, props in self.groups.items()
90             if props.get("type") == "org.openecomp.groups.VfModule"
91         }
92
93     def get_vnf_type(self, module):
94         """
95         Concatenation of service and VF instance name
96         """
97         service_name = self.service_name
98         instance_name = self.get_vf_module_resource_name(module)
99         if service_name and instance_name:
100             return "{}/{}".format(service_name, instance_name)
101
102     @property
103     def vf_module_resource_names(self):
104         """
105         Returns the resource names for all VfModules (these can be used
106         to find the resource templates as they will be part of the filename)
107         """
108         names = (
109             module.get("metadata", {}).get("vfModuleModelName")
110             for module in self.vf_modules.values()
111         )
112         return [name.split(".")[0] for name in names if name]
113
114     def get_vf_module_resource_name(self, vf_module):
115         """
116         Retrieves the resource name of the module or None if vf_module is not
117         found (see get_vf_module)
118
119         :param vf_module: name of Heat module (no path or file extension)
120         :return: The value if resource nae as string or None if not found
121         """
122         vf_model_name = self.get_vf_module_model_name(vf_module)
123         if not vf_model_name:
124             return None
125         resource_name = vf_model_name.split(".")[0]
126         resource = self._resources.get(resource_name, {})
127         return resource.get("metadata", {}).get("name")
128
129     @staticmethod
130     def _get_definition_files(csar_dir):
131         """
132         Returns a list of all files in the CSAR's Definitions directory
133         """
134         def_dir = csar_dir / "Definitions"
135         check(
136             def_dir.exists(),
137             "CSAR is invalid. {} does not contain a Definitions directory.".format(
138                 csar_dir.as_posix()
139             ),
140         )
141         return yaml_files(def_dir)
142
143     def _get_service_template(self, csar_dir):
144         """
145         Returns the service template as a dict.  Assumes there is only one.
146         """
147         files = map(str, self._get_definition_files(csar_dir))
148         service_template = first(files, SERVICE_TEMPLATE_PATTERN.match)
149         return load_yaml(service_template) if service_template else {}
150
151     def _get_vf_module_resource_templates(self, csar_dir):
152         """
153         Returns a mapping of resource name to resource definition (as a dict)
154         (Only loads resource templates that correspond to VF Modules
155         """
156         def_dir = csar_dir / "Definitions"
157         mapping = (
158             (name, def_dir / "resource-{}-template.yml".format(name))
159             for name in self.vf_module_resource_names
160         )
161         return {name: load_yaml(path) for name, path in mapping if path.exists()}
162
163     @property
164     def service_name(self):
165         """
166         Name of the service (extracted from the service template
167         """
168         return self._service.get("metadata", {}).get("name")
169
170     def __repr__(self):
171         return f"CSAR (path={self.csar_path.name}, name={self.service_name})"
172
173     def __str__(self):
174         return repr(self)
175
176
177 class PreloadEnvironment:
178
179     def __init__(self, env_dir, parent=None):
180         self.base_dir = Path(env_dir)
181         self.parent = parent
182         self._modules = self._load_modules()
183         self._sub_env = self._load_envs()
184         self._defaults = self._load_defaults()
185
186     def _load_defaults(self):
187         defaults = self.base_dir / "defaults.yaml"
188         return load_yaml(defaults) if defaults.exists() else {}
189
190     def _load_modules(self):
191         files = [
192             p
193             for p in self.base_dir.iterdir()
194             if p.is_file() and p.suffix.lower().endswith(".env")
195         ]
196         return {f.name.lower(): load_yaml(f).get("parameters", {}) for f in files}
197
198     def _load_envs(self):
199         env_dirs = [
200             p for p in self.base_dir.iterdir() if p.is_dir() and p.name != "preloads"
201         ]
202         return {d.name: PreloadEnvironment(d, self) for d in env_dirs}
203
204     @cached_property
205     def csar(self):
206         csar_path = first(self.base_dir.iterdir(), lambda p: p.suffix == ".csar")
207         if csar_path:
208             return CloudServiceArchive(csar_path)
209         else:
210             return self.parent.csar if self.parent else None
211
212     @property
213     def defaults(self):
214         result = {}
215         if self.parent:
216             result.update(self.parent.defaults)
217         result.update(self._defaults)
218         return result
219
220     @property
221     def environments(self):
222         all_envs = [self]
223         for env in self._sub_env.values():
224             all_envs.append(env)
225             all_envs.extend(env.environments)
226         return [e for e in all_envs if e.is_leaf]
227
228     def get_module(self, name):
229         name = name if name.lower().endswith(".env") else f"{name}.env".lower()
230         if name not in self.module_names:
231             return {}
232         result = {}
233         parent_module = self.parent.get_module(name) if self.parent else None
234         module = self._modules.get(name)
235         for m in (parent_module, self.defaults, module):
236             if m:
237                 result.update(m)
238         if self.csar:
239             vnf_type = self.csar.get_vnf_type(name)
240             if vnf_type:
241                 result["vnf-type"] = vnf_type
242             model_name = self.csar.get_vf_module_model_name(name)
243             if model_name:
244                 result["vf-module-model-name"] = model_name
245         return result
246
247     @property
248     def module_names(self):
249         parent_modules = self.parent.module_names if self.parent else set()
250         result = set()
251         result.update(self._modules.keys())
252         result.update(parent_modules)
253         return result
254
255     @property
256     def modules(self):
257         return {name: self.get_module(name) for name in self.module_names}
258
259     def get_environment(self, env_name):
260         for name, env in self._sub_env.items():
261             if name == env_name:
262                 return env
263             result = env.get_environment(env_name)
264             if result:
265                 return result
266         return None
267
268     @property
269     def is_base(self):
270         return self.parent is None
271
272     @property
273     def is_leaf(self):
274         return not self._sub_env
275
276     @property
277     def name(self):
278         return self.base_dir.name
279
280     def __repr__(self):
281         return f"PreloadEnvironment(name={self.name})"