1 from typing import Iterable
5 from onapsdk.aai.business import Customer, ServiceInstance, ServiceSubscription
6 from onapsdk.configuration import settings
7 from onapsdk.msb.k8s import Definition
8 from onapsdk.so.instantiation import VnfParameter
10 import onaptests.utils.exceptions as onap_test_exceptions
11 from ..base import BaseStep
12 from .vnf_ala_carte import YamlTemplateVnfAlaCarteInstantiateStep
14 class K8SProfileStep(BaseStep):
15 """CreateK8sProfileStep."""
17 def __init__(self, cleanup=False):
20 super().__init__(cleanup=cleanup)
22 self._yaml_template: dict = None
23 self._service_instance_name: str = None
24 self._service_instance: ServiceInstance = None
25 self.add_step(YamlTemplateVnfAlaCarteInstantiateStep(cleanup))
28 def yaml_template(self) -> dict:
29 """Step YAML template.
31 Load from file if it's a root step, get from parent otherwise.
34 dict: Step YAML template
38 if not self._yaml_template:
39 with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
40 self._yaml_template: dict = load(yaml_template)
41 return self._yaml_template
42 return self.parent.yaml_template
45 def service_name(self) -> str:
48 Get from YAML template if it's a root step, get from parent otherwise.
55 return next(iter(self.yaml_template.keys()))
56 return self.parent.service_name
59 def service_instance_name(self) -> str:
60 """Service instance name.
62 Generate using `service_name` and `uuid4()` function if it's a root step,
63 get from parent otherwise.
66 str: Service instance name
70 if not self._service_instance_name:
71 self._service_instance_name: str = f"{self.service_name}-{str(uuid4())}"
72 return self._service_instance_name
73 return self.parent.service_instance_name
75 def get_vnf_parameters(self, vnf_name: str) -> Iterable[VnfParameter]:
76 """Get VNF parameters from YAML template.
79 vnf_name (str): VNF name to get parameters for.
82 Iterator[Iterable[VnfParameter]]: VNF parameter
86 # workaround, as VNF name differs from model name (added " 0")
87 vnf_name = vnf_name.split()[0]
88 for vnf in self.yaml_template[self.service_name]["vnfs"]:
89 if vnf["vnf_name"] == vnf_name:
90 for vnf_parameter in vnf["vnf_parameters"]:
92 name=vnf_parameter["name"],
93 value=vnf_parameter["value"]
98 """Creation of k8s profile for resource bundle definition
102 - K8S_PROFILE_K8S_VERSION
103 - K8S_PROFILE_ARTIFACT_PATH.
105 self._logger.info("Create the k8s profile if it doesn't exist")
107 customer: Customer = Customer.get_by_global_customer_id(settings.GLOBAL_CUSTOMER_ID)
108 service_subscription: ServiceSubscription = customer.get_service_subscription_by_service_type(self.service_name)
109 self._service_instance: ServiceInstance = service_subscription.get_service_instance_by_name(self.service_instance_name)
111 for vnf_instance in self._service_instance.vnf_instances:
112 # possible to have several modules for 1 VNF
113 for vf_module in vnf_instance.vnf.vf_modules:
114 # Define profile (rb_profile) for resource bundle definition
115 # Retrieve resource bundle definition (rbdef) corresponding to vf module
116 rbdef_name = vf_module.metadata["vfModuleModelInvariantUUID"]
117 rbdef_version = vf_module.metadata["vfModuleModelUUID"]
118 rbdef = Definition.get_definition_by_name_version(rbdef_name, rbdef_version)
119 # Get k8s profile name from yaml service template
120 vnf_parameters = self.get_vnf_parameters(vnf_instance.vnf.name)
121 k8s_profile_name = ""
122 k8s_profile_namespace = ""
123 for param in vnf_parameters:
124 if param.name == "k8s-rb-profile-name":
125 k8s_profile_name = param.value
126 if param.name == "k8s-rb-profile-namespace":
127 k8s_profile_namespace = param.value
128 if k8s_profile_name == "" or k8s_profile_namespace == "":
129 self._logger.error("Missing rb profile information")
130 raise onap_test_exceptions.ProfileInformationException
132 ######## Check profile for Definition ###################################
134 rbdef.get_profile_by_name(k8s_profile_name)
136 ######## Create profile for Definition ###################################
137 profile = rbdef.create_profile(k8s_profile_name,
138 k8s_profile_namespace,
139 settings.K8S_PROFILE_K8S_VERSION)
140 ####### Upload artifact for created profile ##############################
141 profile.upload_artifact(open(settings.K8S_PROFILE_ARTIFACT_PATH, 'rb').read())
143 def cleanup(self) -> None:
144 """Cleanup K8S profiles.
146 self._logger.info("Clean the k8s profile")
147 for vnf_instance in self._service_instance.vnf_instances:
148 # possible to have several modules for 1 VNF
149 for vf_module in vnf_instance.vnf.vf_modules:
150 # Retrieve resource bundle definition (rbdef) corresponding to vf module
151 rbdef_name = vf_module.metadata["vfModuleModelInvariantUUID"]
152 rbdef_version = vf_module.metadata["vfModuleModelUUID"]
153 rbdef = Definition.get_definition_by_name_version(rbdef_name, rbdef_version)
154 # Get k8s profile name from yaml service template
155 vnf_parameters = self.get_vnf_parameters(vnf_instance.vnf.name)
156 k8s_profile_name = ""
157 for param in vnf_parameters:
158 if param.name == "k8s-rb-profile-name":
159 k8s_profile_name = param.value
160 if k8s_profile_name == "":
161 self._logger.error("K8s profile deletion failed, missing rb profile name")
162 raise onap_test_exceptions.ProfileInformationException
163 ######## Delete profile for Definition ###################################
165 profile = rbdef.get_profile_by_name(k8s_profile_name)
168 self._logger.error("K8s profile deletion %s failed", k8s_profile_name)
169 raise onap_test_exceptions.ProfileCleanupException