2 from typing import Iterable
6 from onapsdk.aai.cloud_infrastructure import CloudRegion, Tenant
7 from onapsdk.aai.business import Customer, ServiceInstance, ServiceSubscription
8 from onapsdk.configuration import settings
9 from onapsdk.so.instantiation import VnfParameter
11 import onaptests.utils.exceptions as onap_test_exceptions
12 from ..base import YamlTemplateBaseStep
13 from .vnf_ala_carte import YamlTemplateVnfAlaCarteInstantiateStep
14 from .k8s_profile_create import K8SProfileStep
16 class YamlTemplateVfModuleAlaCarteInstantiateStep(YamlTemplateBaseStep):
17 """Instantiate vf module a'la carte using YAML template."""
19 def __init__(self, cleanup=False):
23 - YamlTemplateVnfAlaCarteInstantiateStep.
25 super().__init__(cleanup=cleanup)
27 self._yaml_template: dict = None
28 self._service_instance_name: str = None
29 self._service_instance: ServiceInstance = None
30 if settings.CLOUD_REGION_TYPE == settings.K8S_REGION_TYPE:
31 # K8SProfileStep creates the requested profile and then calls
32 # YamlTemplateVnfAlaCarteInstantiateStep step
33 self.add_step(K8SProfileStep(cleanup))
35 self.add_step(YamlTemplateVnfAlaCarteInstantiateStep(cleanup))
40 def yaml_template(self) -> dict:
41 """Step YAML template.
43 Load from file if it's a root step, get from parent otherwise.
46 dict: Step YAML template
50 if not self._yaml_template:
51 with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
52 self._yaml_template: dict = load(yaml_template)
53 return self._yaml_template
54 return self.parent.yaml_template
57 def service_name(self) -> str:
60 Get from YAML template if it's a root step, get from parent otherwise.
67 return next(iter(self.yaml_template.keys()))
68 return self.parent.service_name
71 def service_instance_name(self) -> str:
72 """Service instance name.
74 Generate using `service_name` and `uuid4()` function if it's a root step,
75 get from parent otherwise.
78 str: Service instance name
82 if not self._service_instance_name:
83 self._service_instance_name: str = f"{self.service_name}-{str(uuid4())}"
84 return self._service_instance_name
85 return self.parent.service_instance_name
87 def get_vnf_parameters(self, vnf_name: str) -> Iterable[VnfParameter]:
88 """Get VNF parameters from YAML template.
91 vnf_name (str): VNF name to get parameters for.
94 Iterator[Iterable[VnfParameter]]: VNF parameter
98 # workaround, as VNF name differs from model name (added " 0")
99 vnf_name = vnf_name.split()[0]
100 for vnf in self.yaml_template[self.service_name]["vnfs"]:
101 if vnf["vnf_name"] == vnf_name:
102 for vnf_parameter in vnf["vnf_parameters"]:
104 name=vnf_parameter["name"],
105 value=vnf_parameter["value"]
108 @YamlTemplateBaseStep.store_state
109 def execute(self) -> None:
110 """Instantiate Vf module.
113 - GLOBAL_CUSTOMER_ID.
116 Exception: Vf module instantiation failed
120 customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
121 service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
122 self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
123 cloud_region: CloudRegion = CloudRegion.get_by_id(
124 cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
125 cloud_region_id=settings.CLOUD_REGION_ID,
127 tenant: Tenant = cloud_region.get_tenant(settings.TENANT_ID)
129 for vnf_instance in self._service_instance.vnf_instances:
130 # possible to have several moduels for 1 VNF
131 for vf_module in vnf_instance.vnf.vf_modules:
132 vf_module_instantiation = vnf_instance.add_vf_module(
136 self._service_instance_name,
137 vnf_parameters=self.get_vnf_parameters(vnf_instance.vnf.name))
138 while not vf_module_instantiation.finished:
140 if vf_module_instantiation.failed:
141 raise onap_test_exceptions.VfModuleInstantiateException
144 def cleanup(self) -> None:
145 """Cleanup Vf module.
148 Exception: Vf module cleaning failed
151 for vnf_instance in self._service_instance.vnf_instances:
152 self._logger.debug("VNF instance %s found in Service Instance ",
154 self._logger.info("Get VF Modules")
155 for vf_module in vnf_instance.vf_modules:
156 self._logger.info("Delete VF Module %s",
158 vf_module_deletion = vf_module.delete()
162 while not vf_module_deletion.finished and nb_try < nb_try_max:
163 self._logger.info("Wait for vf module deletion")
166 if vf_module_deletion.finished:
167 self._logger.info("VfModule %s deleted", vf_module.name)
169 self._logger.error("VfModule deletion %s failed", vf_module.name)
170 raise onap_test_exceptions.VfModuleCleanupException