5 from onapsdk.aai.cloud_infrastructure import CloudRegion, Tenant
6 from onapsdk.aai.business import Customer, ServiceInstance, ServiceSubscription
7 from onapsdk.configuration import settings
8 from onapsdk.sdc.service import Service
9 from onapsdk.vid import LineOfBusiness, Platform
11 import onaptests.utils.exceptions as onap_test_exceptions
12 from ..base import YamlTemplateBaseStep
13 from .service_ala_carte import YamlTemplateServiceAlaCarteInstantiateStep
16 class YamlTemplateVnfAlaCarteInstantiateStep(YamlTemplateBaseStep):
17 """Instantiate vnf a'la carte using YAML template."""
19 def __init__(self, cleanup=False):
23 - YamlTemplateServiceAlaCarteInstantiateStep.
25 super().__init__(cleanup=cleanup)
26 self._yaml_template: dict = None
27 self._service_instance_name: str = None
28 self._service_instance: ServiceInstance = None
29 self.add_step(YamlTemplateServiceAlaCarteInstantiateStep(cleanup))
32 def yaml_template(self) -> dict:
33 """Step YAML template.
35 Load from file if it's a root step, get from parent otherwise.
38 dict: Step YAML template
42 if not self._yaml_template:
43 with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
44 self._yaml_template: dict = load(yaml_template)
45 return self._yaml_template
46 return self.parent.yaml_template
49 def service_name(self) -> str:
52 Get from YAML template if it's a root step, get from parent otherwise.
59 return next(iter(self.yaml_template.keys()))
60 return self.parent.service_name
63 def service_instance_name(self) -> str:
64 """Service instance name.
66 Generate using `service_name` and `uuid4()` function if it's a root step,
67 get from parent otherwise.
70 str: Service instance name
74 if not self._service_instance_name:
75 self._service_instance_name: str = f"{self.service_name}-{str(uuid4())}"
76 return self._service_instance_name
77 return self.parent.service_instance_name
79 @YamlTemplateBaseStep.store_state
88 Exception: VNF instantiation failed
92 service: Service = Service(self.service_name)
93 customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
94 service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
95 self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
96 line_of_business: LineOfBusiness = LineOfBusiness(settings.LINE_OF_BUSINESS)
97 platform: Platform = Platform(settings.PLATFORM)
98 cloud_region: CloudRegion = CloudRegion.get_by_id(
99 cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
100 cloud_region_id=settings.CLOUD_REGION_ID,
102 tenant: Tenant = cloud_region.get_tenant(settings.TENANT_ID)
103 for idx, vnf in enumerate(service.vnfs):
104 vnf_instantiation = self._service_instance.add_vnf(
110 f"{self.service_instance_name}_vnf_{idx}")
111 while not vnf_instantiation.finished:
113 if vnf_instantiation.failed:
114 raise onap_test_exceptions.VnfInstantiateException
116 def cleanup(self) -> None:
120 Exception: VNF cleaning failed
125 for vnf_instance in self._service_instance.vnf_instances:
126 vnf_deletion = vnf_instance.delete()
130 while not vnf_deletion.finished and nb_try < nb_try_max:
131 self._logger.info("Wait for vnf deletion")
134 if vnf_deletion.finished:
135 self._logger.info("VNF %s deleted", vnf_instance.name)
137 self._logger.error("VNF deletion %s failed", vnf_instance.name)
138 raise onap_test_exceptions.VnfCleanupException