1 # http://www.apache.org/licenses/LICENSE-2.0
2 """CDS onboard module."""
5 from pathlib import Path
6 from typing import Any, Dict
8 from kubernetes import client, config
9 from kubernetes.client.exceptions import ApiException
10 from onapsdk.cds import Blueprint, DataDictionarySet
11 from onapsdk.cds.blueprint import Workflow
12 from onapsdk.cds.blueprint_processor import Blueprintprocessor
13 from onapsdk.configuration import settings
16 from onaptests.steps.base import BaseStep
18 from onaptests.utils.exceptions import OnapTestException
21 class CDSBaseStep(BaseStep, ABC):
22 """Abstract CDS base step."""
25 def component(self) -> str:
30 class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep):
31 """Expose CDS blueprintsprocessor port."""
33 def __init__(self, cleanup: bool) -> None:
34 """Initialize step."""
35 super().__init__(cleanup=cleanup)
36 self.service_name: str = "cds-blueprints-processor-http"
37 if settings.IN_CLUSTER:
38 config.load_incluster_config()
40 config.load_kube_config(config_file=settings.K8S_CONFIG)
41 self.k8s_client: client.CoreV1Api = client.CoreV1Api()
44 def description(self) -> str:
45 """Step description."""
46 return "Expose CDS blueprintsprocessor NodePort."
48 def is_service_node_port_type(self) -> bool:
49 """Check if CDS blueprints processor service type is 'NodePort'
52 OnapTestException: Kubernetes API error
55 bool: True if service type is 'NodePort', False otherwise
59 service_data: Dict[str, Any] = self.k8s_client.read_namespaced_service(
61 settings.K8S_ONAP_NAMESPACE
63 return service_data.spec.type == "NodePort"
65 self._logger.exception("Kubernetes API exception")
66 raise OnapTestException
69 def execute(self) -> None:
70 """Expose CDS blueprintprocessor port using kubernetes client.
75 - EXPOSE_SERVICES_NODE_PORTS
79 if not self.is_service_node_port_type():
81 self.k8s_client.patch_namespaced_service(
83 settings.K8S_ONAP_NAMESPACE,
84 {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}}
87 self._logger.exception("Kubernetes API exception")
88 raise OnapTestException
89 except urllib3.exceptions.HTTPError:
90 self._logger.exception("Can't connect with k8s")
91 raise OnapTestException
93 self._logger.debug("Service already patched, skip")
95 def cleanup(self) -> None:
98 Restore CDS blueprintprocessor service.
101 if self.is_service_node_port_type():
103 self.k8s_client.patch_namespaced_service(
105 settings.K8S_ONAP_NAMESPACE,
109 "path": "/spec/ports/0/nodePort"
113 "path": "/spec/type",
119 self._logger.exception("Kubernetes API exception")
120 raise OnapTestException
121 except urllib3.exceptions.HTTPError:
122 self._logger.exception("Can't connect with k8s")
123 raise OnapTestException
125 self._logger.debug("Service is not 'NodePort' type, skip")
126 return super().cleanup()
129 class BootstrapBlueprintprocessor(CDSBaseStep):
130 """Bootstrap blueprintsprocessor."""
132 def __init__(self, cleanup: bool = False) -> None:
136 - ExposeCDSBlueprintprocessorNodePortStep.
138 super().__init__(cleanup=cleanup)
139 if settings.EXPOSE_SERVICES_NODE_PORTS:
140 self.add_step(ExposeCDSBlueprintprocessorNodePortStep(cleanup=cleanup))
143 def description(self) -> str:
144 """Step description."""
145 return "Bootstrap CDS blueprintprocessor"
147 @BaseStep.store_state
148 def execute(self) -> None:
149 """Bootsrap CDS blueprintprocessor."""
151 Blueprintprocessor.bootstrap()
154 class DataDictionaryUploadStep(CDSBaseStep):
155 """Upload data dictionaries to CDS step."""
157 def __init__(self, cleanup: bool = False) -> None:
158 """Initialize data dictionary upload step."""
159 super().__init__(cleanup=cleanup)
160 self.add_step(BootstrapBlueprintprocessor(cleanup=cleanup))
163 def description(self) -> str:
164 """Step description."""
165 return "Upload data dictionaries to CDS."
167 @BaseStep.store_state
168 def execute(self) -> None:
169 """Upload data dictionary to CDS.
176 dd_set: DataDictionarySet = DataDictionarySet.load_from_file(settings.CDS_DD_FILE)
180 class CbaEnrichStep(CDSBaseStep):
181 """Enrich CBA file step."""
183 def __init__(self, cleanup=False) -> None:
184 """Initialize CBA enrichment step."""
185 super().__init__(cleanup=cleanup)
186 self.add_step(DataDictionaryUploadStep(cleanup=cleanup))
189 def description(self) -> str:
190 """Step description."""
191 return "Enrich CBA file."
193 @BaseStep.store_state
194 def execute(self) -> None:
202 blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_UNENRICHED)
203 enriched: Blueprint = blueprint.enrich()
204 enriched.save(settings.CDS_CBA_ENRICHED)
206 @BaseStep.store_state(cleanup=True)
207 def cleanup(self) -> None:
208 """Cleanup enrichment step.
210 Delete enriched CBA file.
213 Path(settings.CDS_CBA_ENRICHED).unlink()
217 class CbaPublishStep(CDSBaseStep):
218 """Publish CBA file step."""
220 def __init__(self, cleanup=False) -> None:
221 """Initialize CBA publish step."""
222 super().__init__(cleanup=cleanup)
223 """Let's skip enrichment if enriched CBA is already present"""
224 if Path.is_file(settings.CDS_CBA_UNENRICHED):
225 self.add_step(CbaEnrichStep(cleanup=cleanup))
226 elif settings.EXPOSE_SERVICES_NODE_PORTS:
227 self.add_step(ExposeCDSBlueprintprocessorNodePortStep(cleanup=cleanup))
230 def description(self) -> str:
231 """Step description."""
232 return "Publish CBA file."
234 @BaseStep.store_state
235 def execute(self) -> None:
243 blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_ENRICHED)
247 class CbaProcessStep(CDSBaseStep):
248 """Process CBA step."""
250 def __init__(self, cleanup=False) -> None:
251 """Initialize CBA process step."""
252 super().__init__(cleanup=cleanup)
253 self.add_step(CbaPublishStep(cleanup=cleanup))
256 def description(self) -> str:
257 """Step description."""
258 return "Process CBA file."
260 @BaseStep.store_state
261 def execute(self) -> None:
264 Check if output is equal to expected
273 blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_ENRICHED)
274 workflow: Workflow = blueprint.get_workflow_by_name(settings.CDS_WORKFLOW_NAME)
275 output: Dict[str, Any] = workflow.execute(settings.CDS_WORKFLOW_INPUT)
276 if not output == settings.CDS_WORKFLOW_EXPECTED_OUTPUT:
277 raise OnapTestException("Response is not equal to the expected one")