Revert "Revert "Create basic_cnf test leveraging onapsdk""
[testsuite/pythonsdk-tests.git] / src / onaptests / steps / instantiate / k8s_profile_create.py
1 from typing import Iterable
2 from uuid import uuid4
3 from yaml import load
4
5 from onapsdk.aai.business import Customer, ServiceInstance, ServiceSubscription
6 from onapsdk.configuration import settings
7 from onapsdk.msb.k8s import Definition
8 from onapsdk.so.instantiation import VnfParameter
9
10 import onaptests.utils.exceptions as onap_test_exceptions
11 from ..base import BaseStep
12 from .vnf_ala_carte import YamlTemplateVnfAlaCarteInstantiateStep
13
14 class K8SProfileStep(BaseStep):
15     """CreateK8sProfileStep."""
16
17     def __init__(self, cleanup=False):
18         """Initialize step.
19         """
20         super().__init__(cleanup=cleanup)
21
22         self._yaml_template: dict = None
23         self._service_instance_name: str = None
24         self._service_instance: ServiceInstance = None
25         self.add_step(YamlTemplateVnfAlaCarteInstantiateStep(cleanup))
26
27     @property
28     def yaml_template(self) -> dict:
29         """Step YAML template.
30
31         Load from file if it's a root step, get from parent otherwise.
32
33         Returns:
34             dict: Step YAML template
35
36         """
37         if self.is_root:
38             if not self._yaml_template:
39                 with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
40                     self._yaml_template: dict = load(yaml_template)
41             return self._yaml_template
42         return self.parent.yaml_template
43
44     @property
45     def service_name(self) -> str:
46         """Service name.
47
48         Get from YAML template if it's a root step, get from parent otherwise.
49
50         Returns:
51             str: Service name
52
53         """
54         if self.is_root:
55             return next(iter(self.yaml_template.keys()))
56         return self.parent.service_name
57
58     @property
59     def service_instance_name(self) -> str:
60         """Service instance name.
61
62         Generate using `service_name` and `uuid4()` function if it's a root step,
63             get from parent otherwise.
64
65         Returns:
66             str: Service instance name
67
68         """
69         if self.is_root:
70             if not self._service_instance_name:
71                 self._service_instance_name: str = f"{self.service_name}-{str(uuid4())}"
72             return self._service_instance_name
73         return self.parent.service_instance_name
74
75     def get_vnf_parameters(self, vnf_name: str) -> Iterable[VnfParameter]:
76         """Get VNF parameters from YAML template.
77
78         Args:
79             vnf_name (str): VNF name to get parameters for.
80
81         Yields:
82             Iterator[Iterable[VnfParameter]]: VNF parameter
83
84         """
85
86         # workaround, as VNF name differs from model name (added " 0")
87         vnf_name = vnf_name.split()[0]
88         for vnf in self.yaml_template[self.service_name]["vnfs"]:
89             if vnf["vnf_name"] == vnf_name:
90                 for vnf_parameter in vnf["vnf_parameters"]:
91                     yield VnfParameter(
92                         name=vnf_parameter["name"],
93                         value=vnf_parameter["value"]
94                     )
95
96     @BaseStep.store_state
97     def execute(self):
98         """Creation of k8s profile for resource bundle definition
99
100         Use settings values:
101          - GLOBAL_CUSTOMER_ID
102          - K8S_PROFILE_K8S_VERSION
103          - K8S_PROFILE_ARTIFACT_PATH.
104         """
105         self._logger.info("Create the k8s profile if it doesn't exist")
106         super().execute()
107         customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
108         service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
109         self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
110
111         for vnf_instance in self._service_instance.vnf_instances:
112             # possible to have several modules for 1 VNF
113             for vf_module in vnf_instance.vnf.vf_modules:
114                 # Define profile (rb_profile) for resource bundle definition
115                 # Retrieve resource bundle definition (rbdef) corresponding to vf module
116                 rbdef_name = vf_module.metadata["vfModuleModelInvariantUUID"]
117                 rbdef_version = vf_module.metadata["vfModuleModelUUID"]
118                 rbdef = Definition.get_definition_by_name_version(rbdef_name, rbdef_version)
119                 # Get k8s profile name from yaml service template
120                 vnf_parameters = self.get_vnf_parameters(vnf_instance.vnf.name)
121                 k8s_profile_name = ""
122                 k8s_profile_namespace = ""
123                 for param in vnf_parameters:
124                     if param.name == "k8s-rb-profile-name":
125                         k8s_profile_name = param.value
126                     if param.name == "k8s-rb-profile-namespace":
127                         k8s_profile_namespace = param.value
128                 if k8s_profile_name == "" or k8s_profile_namespace == "":
129                     self._logger.error("Missing rb profile information")
130                     raise onap_test_exceptions.ProfileInformationException
131
132                 ######## Check profile for Definition ###################################
133                 try:
134                     rbdef.get_profile_by_name(k8s_profile_name)
135                 except ValueError:
136                     ######## Create profile for Definition ###################################
137                     profile = rbdef.create_profile(k8s_profile_name,
138                                                    k8s_profile_namespace,
139                                                    settings.K8S_PROFILE_K8S_VERSION)
140                     ####### Upload artifact for created profile ##############################
141                     profile.upload_artifact(open(settings.K8S_PROFILE_ARTIFACT_PATH, 'rb').read())
142
143     def cleanup(self) -> None:
144         """Cleanup K8S profiles.
145         """
146         self._logger.info("Clean the k8s profile")
147         for vnf_instance in self._service_instance.vnf_instances:
148             # possible to have several modules for 1 VNF
149             for vf_module in vnf_instance.vnf.vf_modules:
150                 # Retrieve resource bundle definition (rbdef) corresponding to vf module
151                 rbdef_name = vf_module.metadata["vfModuleModelInvariantUUID"]
152                 rbdef_version = vf_module.metadata["vfModuleModelUUID"]
153                 rbdef = Definition.get_definition_by_name_version(rbdef_name, rbdef_version)
154                 # Get k8s profile name from yaml service template
155                 vnf_parameters = self.get_vnf_parameters(vnf_instance.vnf.name)
156                 k8s_profile_name = ""
157                 for param in vnf_parameters:
158                     if param.name == "k8s-rb-profile-name":
159                         k8s_profile_name = param.value
160                 if k8s_profile_name == "":
161                     self._logger.error("K8s profile deletion failed, missing rb profile name")
162                     raise onap_test_exceptions.ProfileInformationException
163                 ######## Delete profile for Definition ###################################
164                 try:
165                     profile = rbdef.get_profile_by_name(k8s_profile_name)
166                     profile.delete()
167                 except ValueError:
168                     self._logger.error("K8s profile deletion %s failed", k8s_profile_name)
169                     raise onap_test_exceptions.ProfileCleanupException
170         super().cleanup()