2 from typing import Any, Dict
3 from yaml import load, SafeLoader
5 import onapsdk.constants as onapsdk_const
6 from onapsdk.configuration import settings
7 from onapsdk.exceptions import APIError, ResourceNotFound
8 from onapsdk.sdc.component import Component
9 from onapsdk.sdc.pnf import Pnf
10 from onapsdk.sdc.properties import ComponentProperty
11 from onapsdk.sdc.service import Service, ServiceInstantiationType
12 from onapsdk.sdc.vf import Vf
13 from onapsdk.sdc.vl import Vl
15 from ..base import BaseStep, YamlTemplateBaseStep
16 from .pnf import PnfOnboardStep, YamlTemplatePnfOnboardStep
17 from .vf import VfOnboardStep, YamlTemplateVfOnboardStep
20 class ServiceOnboardStep(BaseStep):
21 """Service onboard step."""
23 def __init__(self, cleanup=False):
29 super().__init__(cleanup=cleanup)
30 if settings.VF_NAME != "":
31 self.add_step(VfOnboardStep(cleanup=cleanup))
32 if settings.PNF_NAME != "":
33 self.add_step(PnfOnboardStep(cleanup=cleanup))
36 def description(self) -> str:
37 """Step description."""
38 return "Onboard service in SDC."
41 def component(self) -> str:
54 - SERVICE_INSTANTIATION_TYPE.
58 service: Service = Service(name=settings.SERVICE_NAME, instantiation_type=settings.SERVICE_INSTANTIATION_TYPE)
59 if not service.created():
61 if settings.VL_NAME != "":
62 vl: Vl = Vl(name=settings.VL_NAME)
63 service.add_resource(vl)
64 if settings.VF_NAME != "":
65 vf: Vf = Vf(name=settings.VF_NAME)
66 service.add_resource(vf)
67 if settings.PNF_NAME != "":
68 pnf: Pnf = Pnf(name=settings.PNF_NAME)
69 service.add_resource(pnf)
70 # If the service is already distributed, do not try to checkin/onboard (replay of tests)
71 # checkin is done if needed
72 # If service is replayed, no need to try to re-onboard the model
73 # Double check because of: https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
74 if not service.distributed and service.status != onapsdk_const.DISTRIBUTED:
80 class YamlTemplateServiceOnboardStep(YamlTemplateBaseStep):
81 """Service onboard using YAML template step."""
83 def __init__(self, cleanup=False):
87 - YamlTemplateVfOnboardStep.
89 super().__init__(cleanup=cleanup)
90 self._yaml_template: dict = None
91 self._model_yaml_template: dict = None
92 if "vnfs" in self.yaml_template[self.service_name]:
93 self.add_step(YamlTemplateVfOnboardStep(cleanup=cleanup))
94 if "pnfs" in self.yaml_template[self.service_name]:
95 self.add_step(YamlTemplatePnfOnboardStep(cleanup=cleanup))
98 def description(self) -> str:
99 """Step description."""
100 return "Onboard service described in YAML file in SDC."
103 def component(self) -> str:
104 """Component name."""
108 def yaml_template(self) -> dict:
109 """Step YAML template.
111 Load from file if it's a root step, get from parent otherwise.
114 dict: Step YAML template
117 if settings.MODEL_YAML_TEMPLATE:
118 return self.model_yaml_template
120 if not self._yaml_template:
121 with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
122 self._yaml_template: dict = load(yaml_template, SafeLoader)
123 return self._yaml_template
124 return self.parent.yaml_template
127 def model_yaml_template(self) -> dict:
128 """Step Model YAML template.
130 Load from file if it's a root step, get from parent otherwise.
133 dict: Step YAML template
137 if not self._model_yaml_template:
138 with open(settings.MODEL_YAML_TEMPLATE, "r") as model_yaml_template:
139 self._model_yaml_template: dict = load(model_yaml_template)
140 return self._model_yaml_template
141 return self.parent.model_yaml_template
144 def service_name(self) -> str:
147 Get from YAML template if it's a root step, get from parent otherwise.
154 return next(iter(self.yaml_template.keys()))
155 return self.parent.service_name
157 @YamlTemplateBaseStep.store_state
159 """Onboard service."""
161 if "instantiation_type" in self.yaml_template[self.service_name]:
162 instantiation_type: ServiceInstantiationType = ServiceInstantiationType(
163 self.yaml_template[self.service_name]["instantiation_type"])
165 instantiation_type: ServiceInstantiationType = ServiceInstantiationType.A_LA_CARTE
166 service: Service = Service(name=self.service_name, instantiation_type=instantiation_type)
167 if not service.created():
169 self.declare_resources(service)
170 self.assign_properties(service)
171 # If the service is already distributed, do not try to checkin/onboard (replay of tests)
172 # checkin is done if needed
173 # If service is replayed, no need to try to re-onboard the model
174 # Double check because of: https://gitlab.com/Orange-OpenSource/lfn/onap/python-onapsdk/-/issues/176
175 if not service.distributed and service.status != onapsdk_const.DISTRIBUTED:
178 except (APIError, ResourceNotFound):
179 # Retry as checkin may be a bit long
180 # Temp workaround to avoid internal race in SDC
185 def declare_resources(self, service: Service) -> None:
186 """Declare resources.
188 Resources defined in YAML template are declared.
191 service (Service): Service object
194 if "networks" in self.yaml_template[self.service_name]:
195 for net in self.yaml_template[self.service_name]["networks"]:
196 vl: Vl = Vl(name=net['vl_name'])
197 service.add_resource(vl)
198 if "vnfs" in self.yaml_template[self.service_name]:
199 for vnf in self.yaml_template[self.service_name]["vnfs"]:
200 vf: Vf = Vf(name=vnf["vnf_name"])
201 service.add_resource(vf)
202 if "pnfs" in self.yaml_template[self.service_name]:
203 for pnf in self.yaml_template[self.service_name]["pnfs"]:
204 pnf_obj: Pnf = Pnf(name=pnf["pnf_name"])
205 service.add_resource(pnf_obj)
207 def assign_properties(self, service: Service) -> None:
208 """Assign components properties.
210 For each component set properties and it's value if are declared
214 service (Service): Service object
217 if "networks" in self.yaml_template[self.service_name]:
218 for net in self.yaml_template[self.service_name]["networks"]:
219 if "properties" in net:
220 vl: Vl = Vl(name=net['vl_name'])
221 vl_component: Component = service.get_component(vl)
222 self.assign_properties_to_component(vl_component, net["properties"])
223 if "vnfs" in self.yaml_template[self.service_name]:
224 for vnf in self.yaml_template[self.service_name]["vnfs"]:
225 if "properties" in vnf:
226 vf: Vf = Vf(name=vnf["vnf_name"])
227 vf_component: Component = service.get_component(vf)
228 self.assign_properties_to_component(vf_component, vnf["properties"])
229 if "pnfs" in self.yaml_template[self.service_name]:
230 for pnf in self.yaml_template[self.service_name]["pnfs"]:
231 if "properties" in pnf:
232 pnf_obj: Pnf = Pnf(name=pnf["pnf_name"])
233 pnf_component: Component = service.get_component(pnf_obj)
234 self.assign_properties_to_component(pnf_component, pnf["properties"])
236 def assign_properties_to_component(self,
237 component: Component,
238 component_properties: Dict[str, Any]) -> None:
239 """Assign properties to component.
242 component (Component): Component to which properites are going to be assigned
243 component_properties (Dict[str, Any]): Properties dictionary
246 for property_name, property_value in component_properties.items():
247 prop: ComponentProperty = component.get_property(property_name)
248 prop.value = property_value