2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
49 from tests import cached_yaml as yaml
50 from tests.helpers import load_yaml
52 from .utils import nested_dict
56 # key = pattern, value = regex compiled from pattern
60 def _get_regex(pattern):
61 """Return a compiled version of pattern.
62 Keep result in _REGEX_CACHE to avoid re-compiling.
64 regex = _REGEX_CACHE.get(pattern, None)
66 regex = re.compile(pattern)
67 _REGEX_CACHE[pattern] = regex
71 class HeatObject(object):
72 """base class for xxxx::xxxx::xxxx objects
78 self.re_rids = self.get_re_rids()
82 """Return OrderedDict of name: regex
83 Each regex parses the proper format for a given rid
86 return collections.OrderedDict()
88 def get_rid_match_tuple(self, rid):
89 """find the first regex matching `rid` and return the tuple
90 (name, match object) or ('', None) if no match.
92 for name, regex in self.re_rids.items():
93 match = regex.match(rid)
98 def get_rid_patterns(self):
99 """Return OrderedDict of name: friendly regex.pattern
100 "friendly" means the group notation is replaced with
101 braces, and the trailing "$" is removed.
104 nested parentheses in any rid_pattern will break this parser.
105 The final character is ASSUMED to be a dollar sign.
107 friendly_pattern = _get_regex(r"\(\?P<(.*?)>.*?\)")
108 rid_patterns = collections.OrderedDict()
109 for name, regex in self.re_rids.items():
110 rid_patterns[name] = friendly_pattern.sub(
111 r"{\1}", regex.pattern # replace groups with braces
114 ] # remove trailing $
118 class ContrailV2NetworkHeatObject(HeatObject):
119 """ContrailV2 objects which have network_flavor
122 network_flavor_external = "external"
123 network_flavor_internal = "internal"
124 network_flavor_subint = "subint"
126 def get_network_flavor(self, resource):
127 """Return the network flavor of resource, one of
128 "internal" - get_resource, or get_param contains _int_
129 "subint" - get_param contains _subint_
130 "external" - otherwise
131 None - no parameters found to decide the flavor.
133 resource.properties.virtual_network_refs should be a list.
134 All the parameters in the list should have the same "flavor"
135 so the flavor is determined from the first item.
137 network_flavor = None
138 network_refs = nested_dict.get(resource, "properties", "virtual_network_refs")
139 if network_refs and isinstance(network_refs, list):
140 param = network_refs[0]
141 if isinstance(param, dict):
142 if "get_resource" in param:
143 network_flavor = self.network_flavor_internal
145 p = param.get("get_param")
146 if isinstance(p, str):
147 if "_int_" in p or p.startswith("int_"):
148 network_flavor = self.network_flavor_internal
149 elif "_subint_" in p:
150 network_flavor = self.network_flavor_subint
152 network_flavor = self.network_flavor_external
153 return network_flavor
156 class ContrailV2InstanceIp(ContrailV2NetworkHeatObject):
157 """ ContrailV2 InstanceIp
160 resource_type = "OS::ContrailV2::InstanceIp"
162 def get_re_rids(self):
163 """Return OrderedDict of name: regex
165 return collections.OrderedDict(
171 r"_(?P<vm_type_index>\d+)"
173 r"_(?P<network_role>.+)"
175 r"_(?P<vmi_index>\d+)"
185 r"_(?P<vm_type_index>\d+)"
187 r"_(?P<network_role>.+)"
189 r"_(?P<vmi_index>\d+)"
199 r"_(?P<vm_type_index>\d+)"
201 r"_(?P<network_role>.+)"
203 r"_(?P<vmi_index>\d+)"
213 r"_(?P<vm_type_index>\d+)"
215 r"_(?P<network_role>.+)"
217 r"_(?P<vmi_index>\d+)"
227 r"_(?P<vm_type_index>\d+)"
228 r"_(?P<network_role>.+)"
230 r"_(?P<vmi_index>\d+)"
240 r"_(?P<vm_type_index>\d+)"
241 r"_(?P<network_role>.+)"
243 r"_(?P<vmi_index>\d+)"
253 class ContrailV2InterfaceRouteTable(HeatObject):
254 """ ContrailV2 InterfaceRouteTable
257 resource_type = "OS::ContrailV2::InterfaceRouteTable"
260 class ContrailV2NetworkIpam(HeatObject):
261 """ ContrailV2 NetworkIpam
264 resource_type = "OS::ContrailV2::NetworkIpam"
267 class ContrailV2PortTuple(HeatObject):
268 """ ContrailV2 PortTuple
271 resource_type = "OS::ContrailV2::PortTuple"
274 class ContrailV2ServiceHealthCheck(HeatObject):
275 """ ContrailV2 ServiceHealthCheck
278 resource_type = "OS::ContrailV2::ServiceHealthCheck"
281 class ContrailV2ServiceInstance(HeatObject):
282 """ ContrailV2 ServiceInstance
285 resource_type = "OS::ContrailV2::ServiceInstance"
288 class ContrailV2ServiceInstanceIp(HeatObject):
289 """ ContrailV2 ServiceInstanceIp
292 resource_type = "OS::ContrailV2::ServiceInstanceIp"
295 class ContrailV2ServiceTemplate(HeatObject):
296 """ ContrailV2 ServiceTemplate
299 resource_type = "OS::ContrailV2::ServiceTemplate"
302 class ContrailV2VirtualMachineInterface(ContrailV2NetworkHeatObject):
303 """ ContrailV2 Virtual Machine Interface resource
306 resource_type = "OS::ContrailV2::VirtualMachineInterface"
308 def get_re_rids(self):
309 """Return OrderedDict of name: regex
311 return collections.OrderedDict(
317 r"_(?P<vm_type_index>\d+)"
319 r"_(?P<network_role>.+)"
321 r"_(?P<vmi_index>\d+)"
329 r"_(?P<vm_type_index>\d+)"
331 r"_(?P<network_role>.+)"
333 r"_(?P<vmi_index>\d+)"
341 r"_(?P<vm_type_index>\d+)"
342 r"_(?P<network_role>.+)"
344 r"_(?P<vmi_index>\d+)"
352 class ContrailV2VirtualNetwork(HeatObject):
353 """ ContrailV2 VirtualNetwork
356 resource_type = "OS::ContrailV2::VirtualNetwork"
358 def get_re_rids(self):
359 """Return OrderedDict of name: regex
361 return collections.OrderedDict(
365 _get_regex(r"int" r"_(?P<network_role>.+)" r"_network" r"$"),
367 ("rvn", _get_regex(r"int" r"_(?P<network_role>.+)" r"_RVN" r"$")),
372 class NeutronNet(HeatObject):
373 """ Neutron Net resource
376 resource_type = "OS::Neutron::Net"
378 def get_re_rids(self):
379 """Return OrderedDict of name: regex
381 return collections.OrderedDict(
382 [("network", _get_regex(r"int" r"_(?P<network_role>.+)" r"_network" r"$"))]
386 class NeutronPort(HeatObject):
387 """ Neutron Port resource
390 resource_type = "OS::Neutron::Port"
392 def get_re_rids(self):
393 """Return OrderedDict of name: regex
395 return collections.OrderedDict(
401 r"_(?P<vm_type_index>\d+)"
403 r"_(?P<network_role>.+)"
404 r"_port_(?P<port_index>\d+)"
412 r"_(?P<vm_type_index>\d+)"
413 r"_(?P<network_role>.+)"
414 r"_port_(?P<port_index>\d+)"
423 r"_(?P<network_role>.+)"
424 r"_floating_ip_(?P<index>\d+)"
433 r"_(?P<network_role>.+)"
434 r"_floating_v6_ip_(?P<index>\d+)"
442 class NovaServer(HeatObject):
443 """ Nova Server resource
446 resource_type = "OS::Nova::Server"
448 def get_re_rids(self):
449 """Return OrderedDict of name: regex
451 return collections.OrderedDict(
456 r"(?P<vm_type>.+)" r"_server_(?P<vm_type_index>\d+)" r"$"
465 filepath - absolute path to template file.
466 envpath - absolute path to environmnt file.
469 type_cdl = "comma_delimited_list"
473 def __init__(self, filepath=None, envpath=None):
478 self.heat_template_version = None
479 self.description = None
480 self.parameter_groups = None
481 self.parameters = None
482 self.resources = None
484 self.conditions = None
489 self.load_env(envpath)
490 self.heat_objects = self.get_heat_objects()
493 def contrail_resources(self):
494 """This attribute is a dict of Contrail resources.
496 return self.get_resource_by_type(
497 resource_type=ContrailV2VirtualMachineInterface.resource_type
501 def get_heat_objects():
502 """Return a dict, key is resource_type, value is the
503 HeatObject subclass whose resource_type is the key.
507 def get_resource_by_type(self, resource_type):
508 """Return dict of resources whose type is `resource_type`.
509 key is resource_id, value is resource.
513 for rid, resource in self.resources.items()
514 if self.nested_get(resource, "type") == resource_type
517 def get_rid_match_tuple(self, rid, resource_type):
518 """return get_rid_match_tuple(rid) called on the class
519 corresponding to the given resource_type.
521 hoc = self.heat_objects.get(resource_type, HeatObject)
522 return hoc().get_rid_match_tuple(rid)
524 def get_vm_type(self, rid, resource=None):
525 """return the vm_type
529 resource_type = self.nested_get(resource, "type")
530 match = self.get_rid_match_tuple(rid, resource_type)[1]
531 vm_type = match.groupdict().get("vm_type") if match else None
534 def load(self, filepath):
535 """Load the Heat template given a filepath.
537 self.filepath = filepath
538 self.basename = os.path.basename(self.filepath)
539 self.dirname = os.path.dirname(self.filepath)
540 with open(self.filepath) as fi:
541 self.yml = yaml.load(fi)
542 self.heat_template_version = self.yml.get("heat_template_version", None)
543 self.description = self.yml.get("description", "")
544 self.parameter_groups = self.yml.get("parameter_groups", {})
545 self.parameters = self.yml.get("parameters") or {}
546 self.resources = self.yml.get("resources", {})
547 self.outputs = self.yml.get("outputs", {})
548 self.conditions = self.yml.get("conditions", {})
550 def get_all_resources(self, base_dir):
552 Like ``resources``, but this returns all the resources definitions
553 defined in the template, resource groups, and nested YAML files.
556 for r_id, r_data in self.resources.items():
557 resources[r_id] = r_data
558 resource = Resource(r_id, r_data)
559 if resource.is_nested():
560 nested = Heat(os.path.join(base_dir, resource.get_nested_filename()))
561 resources.update(nested.get_all_resources(base_dir))
564 def load_env(self, envpath):
566 Load the Environment template given a envpath.
568 self.env = Env(filepath=envpath)
571 def nested_get(dic, *keys, **kwargs):
572 """make utils.nested_dict.get available as a class method.
574 return nested_dict.get(dic, *keys, **kwargs)
577 def neutron_port_resources(self):
578 """This attribute is a dict of Neutron Ports
580 return self.get_resource_by_type(resource_type=NeutronPort.resource_type)
583 def nova_server_resources(self):
584 """This attribute is a dict of Nova Servers
586 return self.get_resource_by_type(resource_type=NovaServer.resource_type)
589 def part_is_in_name(part, name):
591 Return True if any of
592 - name starts with part + '_'
593 - name contains '_' + part + '_'
594 - name ends with '_' + part
598 re.search("(^(%(x)s)_)|(_(%(x)s)_)|(_(%(x)s)$)" % dict(x=part), name)
603 """An Environment file
609 class Resource(object):
613 def __init__(self, resource_id=None, resource=None):
614 self.resource_id = resource_id or ""
615 self.resource = resource or {}
616 self.properties = self.resource.get("properties", {})
617 self.resource_type = resource.get("type", "")
620 def get_index_var(resource):
621 """Return the index_var for this resource.
623 index_var = nested_dict.get(resource, "properties", "index_var") or "index"
626 def get_nested_filename(self):
627 """Returns the filename of the nested YAML file if the
628 resource is a nested YAML or ResourceDef, returns '' otherwise."""
629 typ = self.resource.get("type", "")
630 if typ == "OS::Heat::ResourceGroup":
631 rd = nested_dict.get(self.resource, "properties", "resource_def")
632 typ = rd.get("type", "") if rd else ""
633 ext = os.path.splitext(typ)[1]
635 if ext == ".yml" or ext == ".yaml":
640 def get_nested_properties(self):
642 Returns {} if not nested
643 Returns resource: properties if nested
644 Returns resource: properties: resource_def: properties if RG
646 if not bool(self.get_nested_filename()):
648 elif self.resource_type == "OS::Heat::ResourceGroup":
649 return nested_dict.get(
650 self.properties, "resource_def", "properties", default={}
653 return self.properties
656 def depends_on(self):
658 Returns the list of resources this resource depends on. Always
661 :return: list of all resource IDs this resource depends on. If none,
662 then returns an empty list
664 parents = self.resource.get("depends_on", [])
665 return parents if isinstance(parents, list) else [parents]
668 """Returns True if the resource represents a Nested YAML resource
669 using either type: {filename} or ResourceGroup -> resource_def"""
670 return bool(self.get_nested_filename())
672 def get_nested_yaml(self, base_dir):
673 """If the resource represents a Nested YAML resource, then it
674 returns the loaded YAML. If the resource is not nested or the
675 file cannot be found, then an empty dict is returned"""
676 filename = self.get_nested_filename()
678 file_path = os.path.join(base_dir, filename)
679 return load_yaml(file_path) if os.path.exists(file_path) else {}
684 def _get_heat_objects():
686 Introspect this module and return a dict of all HeatObject sub-classes with
687 a (True) resource_type. Key is the resource_type, value is the
690 mod_classes = inspect.getmembers(sys.modules[__name__], inspect.isclass)
693 for _, c in mod_classes
694 if issubclass(c, HeatObject) and c.resource_type
699 _HEAT_OBJECTS = _get_heat_objects()