Revert "Revert "Create basic_cnf test leveraging onapsdk""
[testsuite/pythonsdk-tests.git] / src / onaptests / steps / instantiate / vnf_ala_carte.py
1 import time
2 from uuid import uuid4
3 from yaml import load
4
5 from onapsdk.aai.cloud_infrastructure import CloudRegion, Tenant
6 from onapsdk.aai.business import Customer, ServiceInstance, ServiceSubscription
7 from onapsdk.configuration import settings
8 from onapsdk.sdc.service import Service
9 from onapsdk.vid import LineOfBusiness, Platform
10
11 import onaptests.utils.exceptions as onap_test_exceptions
12 from ..base import YamlTemplateBaseStep
13 from .service_ala_carte import YamlTemplateServiceAlaCarteInstantiateStep
14
15
16 class YamlTemplateVnfAlaCarteInstantiateStep(YamlTemplateBaseStep):
17     """Instantiate vnf a'la carte using YAML template."""
18
19     def __init__(self, cleanup=False):
20         """Initialize step.
21
22         Substeps:
23             - YamlTemplateServiceAlaCarteInstantiateStep.
24         """
25         super().__init__(cleanup=cleanup)
26         self._yaml_template: dict = None
27         self._service_instance_name: str = None
28         self._service_instance: ServiceInstance = None
29         self.add_step(YamlTemplateServiceAlaCarteInstantiateStep(cleanup))
30
31     @property
32     def yaml_template(self) -> dict:
33         """Step YAML template.
34
35         Load from file if it's a root step, get from parent otherwise.
36
37         Returns:
38             dict: Step YAML template
39
40         """
41         if self.is_root:
42             if not self._yaml_template:
43                 with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
44                     self._yaml_template: dict = load(yaml_template)
45             return self._yaml_template
46         return self.parent.yaml_template
47
48     @property
49     def service_name(self) -> str:
50         """Service name.
51
52         Get from YAML template if it's a root step, get from parent otherwise.
53
54         Returns:
55             str: Service name
56
57         """
58         if self.is_root:
59             return next(iter(self.yaml_template.keys()))
60         return self.parent.service_name
61
62     @property
63     def service_instance_name(self) -> str:
64         """Service instance name.
65
66         Generate using `service_name` and `uuid4()` function if it's a root step,
67             get from parent otherwise.
68
69         Returns:
70             str: Service instance name
71
72         """
73         if self.is_root:
74             if not self._service_instance_name:
75                 self._service_instance_name: str = f"{self.service_name}-{str(uuid4())}"
76             return self._service_instance_name
77         return self.parent.service_instance_name
78
79     @YamlTemplateBaseStep.store_state
80     def execute(self):
81         """Instantiate vnf.
82
83         Use settings values:
84          - GLOBAL_CUSTOMER_ID,
85          - LINE_OF_BUSINESS.
86
87         Raises:
88             Exception: VNF instantiation failed
89
90         """
91         super().execute()
92         service: Service = Service(self.service_name)
93         customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
94         service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
95         self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
96         line_of_business: LineOfBusiness = LineOfBusiness(settings.LINE_OF_BUSINESS)
97         platform: Platform = Platform(settings.PLATFORM)
98         cloud_region: CloudRegion = CloudRegion.get_by_id(
99             cloud_owner=settings.CLOUD_REGION_CLOUD_OWNER,
100             cloud_region_id=settings.CLOUD_REGION_ID,
101         )
102         tenant: Tenant = cloud_region.get_tenant(settings.TENANT_ID)
103         for idx, vnf in enumerate(service.vnfs):
104             vnf_instantiation = self._service_instance.add_vnf(
105                 vnf,
106                 line_of_business,
107                 platform,
108                 cloud_region,
109                 tenant,
110                 f"{self.service_instance_name}_vnf_{idx}")
111             while not vnf_instantiation.finished:
112                 time.sleep(10)
113             if vnf_instantiation.failed:
114                 raise onap_test_exceptions.VnfInstantiateException
115
116     def cleanup(self) -> None:
117         """Cleanup VNF.
118
119         Raises:
120             Exception: VNF cleaning failed
121
122         """
123         for vnf_instance in self._service_instance.vnf_instances:
124             vnf_deletion = vnf_instance.delete()
125             nb_try = 0
126             nb_try_max = 30
127
128             while not vnf_deletion.finished and nb_try < nb_try_max:
129                 self._logger.info("Wait for vnf deletion")
130                 nb_try += 1
131                 time.sleep(15)
132             if vnf_deletion.finished:
133                 self._logger.info("VNF %s deleted", vnf_instance.name)
134             else:
135                 self._logger.error("VNF deletion %s failed", vnf_instance.name)
136                 raise onap_test_exceptions.VnfCleanupException
137         super.cleanup()