Modify Python ONAP SDK tests to not require kubeconfig
[testsuite/pythonsdk-tests.git] / src / onaptests / steps / onboard / cds.py
1 # http://www.apache.org/licenses/LICENSE-2.0
2 """CDS onboard module."""
3
4 from abc import ABC
5 from pathlib import Path
6 from typing import Any, Dict
7
8 from kubernetes import client, config
9 from kubernetes.client.exceptions import ApiException
10 from onapsdk.cds import Blueprint, DataDictionarySet
11 from onapsdk.cds.blueprint import Workflow
12 from onapsdk.cds.blueprint_processor import Blueprintprocessor
13 from onapsdk.configuration import settings
14 import urllib3
15
16 from onaptests.steps.base import BaseStep
17
18 from onaptests.utils.exceptions import OnapTestException
19
20
21 class CDSBaseStep(BaseStep, ABC):
22     """Abstract CDS base step."""
23
24     @property
25     def component(self) -> str:
26         """Component name."""
27         return "CDS"
28
29
30 class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep):
31     """Expose CDS blueprintsprocessor port."""
32
33     def __init__(self, cleanup: bool) -> None:
34         """Initialize step."""
35         super().__init__(cleanup=cleanup)
36         self.service_name: str = "cds-blueprints-processor-http"
37         if settings.IN_CLUSTER:
38             config.load_incluster_config()
39         else:
40             config.load_kube_config(config_file=settings.K8S_CONFIG)
41         self.k8s_client: client.CoreV1Api = client.CoreV1Api()
42
43     @property
44     def description(self) -> str:
45         """Step description."""
46         return "Expose CDS blueprintsprocessor NodePort."
47
48     def is_service_node_port_type(self) -> bool:
49         """Check if CDS blueprints processor service type is 'NodePort'
50
51         Raises:
52             OnapTestException: Kubernetes API error
53
54         Returns:
55             bool: True if service type is 'NodePort', False otherwise
56
57         """
58         try:
59             service_data: Dict[str, Any] = self.k8s_client.read_namespaced_service(
60                 self.service_name,
61                 settings.K8S_ONAP_NAMESPACE
62             )
63             return service_data.spec.type == "NodePort"
64         except ApiException:
65             self._logger.exception("Kubernetes API exception")
66             raise OnapTestException
67
68     @BaseStep.store_state
69     def execute(self) -> None:
70         """Expose CDS blueprintprocessor port using kubernetes client.
71
72         Use settings values:
73          - K8S_CONFIG,
74          - K8S_ONAP_NAMESPACE.
75          - EXPOSE_SERVICES_NODE_PORTS
76
77         """
78         super().execute()
79         if not self.is_service_node_port_type():
80             try:
81                 self.k8s_client.patch_namespaced_service(
82                     self.service_name,
83                     settings.K8S_ONAP_NAMESPACE,
84                     {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}}
85                 )
86             except ApiException:
87                 self._logger.exception("Kubernetes API exception")
88                 raise OnapTestException
89             except urllib3.exceptions.HTTPError:
90                 self._logger.exception("Can't connect with k8s")
91                 raise OnapTestException
92         else:
93             self._logger.debug("Service already patched, skip")
94
95     def cleanup(self) -> None:
96         """Step cleanup.
97
98         Restore CDS blueprintprocessor service.
99
100         """
101         if self.is_service_node_port_type():
102             try:
103                 self.k8s_client.patch_namespaced_service(
104                     self.service_name,
105                     settings.K8S_ONAP_NAMESPACE,
106                     [
107                         {
108                             "op": "remove",
109                             "path": "/spec/ports/0/nodePort"
110                         },
111                         {
112                             "op": "replace",
113                             "path": "/spec/type",
114                             "value": "ClusterIP"
115                         }
116                     ]
117                 )
118             except ApiException:
119                 self._logger.exception("Kubernetes API exception")
120                 raise OnapTestException
121             except urllib3.exceptions.HTTPError:
122                 self._logger.exception("Can't connect with k8s")
123                 raise OnapTestException
124         else:
125             self._logger.debug("Service is not 'NodePort' type, skip")
126         return super().cleanup()
127
128
129 class BootstrapBlueprintprocessor(CDSBaseStep):
130     """Bootstrap blueprintsprocessor."""
131
132     def __init__(self, cleanup: bool = False) -> None:
133         """Initialize step.
134
135         Substeps:
136             - ExposeCDSBlueprintprocessorNodePortStep.
137         """
138         super().__init__(cleanup=cleanup)
139         if settings.EXPOSE_SERVICES_NODE_PORTS:
140             self.add_step(ExposeCDSBlueprintprocessorNodePortStep(cleanup=cleanup))
141
142     @property
143     def description(self) -> str:
144         """Step description."""
145         return "Bootstrap CDS blueprintprocessor"
146
147     @BaseStep.store_state
148     def execute(self) -> None:
149         """Bootsrap CDS blueprintprocessor."""
150         super().execute()
151         Blueprintprocessor.bootstrap()
152
153
154 class DataDictionaryUploadStep(CDSBaseStep):
155     """Upload data dictionaries to CDS step."""
156
157     def __init__(self, cleanup: bool = False) -> None:
158         """Initialize data dictionary upload step."""
159         super().__init__(cleanup=cleanup)
160         self.add_step(BootstrapBlueprintprocessor(cleanup=cleanup))
161
162     @property
163     def description(self) -> str:
164         """Step description."""
165         return "Upload data dictionaries to CDS."
166
167     @BaseStep.store_state
168     def execute(self) -> None:
169         """Upload data dictionary to CDS.
170
171         Use settings values:
172          - CDS_DD_FILE.
173
174         """
175         super().execute()
176         dd_set: DataDictionarySet = DataDictionarySet.load_from_file(settings.CDS_DD_FILE)
177         dd_set.upload()
178
179
180 class CbaEnrichStep(CDSBaseStep):
181     """Enrich CBA file step."""
182
183     def __init__(self, cleanup=False) -> None:
184         """Initialize CBA enrichment step."""
185         super().__init__(cleanup=cleanup)
186         self.add_step(DataDictionaryUploadStep(cleanup=cleanup))
187
188     @property
189     def description(self) -> str:
190         """Step description."""
191         return "Enrich CBA file."
192
193     @BaseStep.store_state
194     def execute(self) -> None:
195         """Enrich CBA file.
196
197         Use settings values:
198          - CDS_DD_FILE.
199
200         """
201         super().execute()
202         blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_UNENRICHED)
203         enriched: Blueprint = blueprint.enrich()
204         enriched.save(settings.CDS_CBA_ENRICHED)
205
206     @BaseStep.store_state(cleanup=True)
207     def cleanup(self) -> None:
208         """Cleanup enrichment step.
209
210         Delete enriched CBA file.
211
212         """
213         Path(settings.CDS_CBA_ENRICHED).unlink()
214         super().cleanup()
215
216
217 class CbaPublishStep(CDSBaseStep):
218     """Publish CBA file step."""
219
220     def __init__(self, cleanup=False) -> None:
221         """Initialize CBA publish step."""
222         super().__init__(cleanup=cleanup)
223         """Let's skip enrichment if enriched CBA is already present"""
224         if Path.is_file(settings.CDS_CBA_UNENRICHED):
225             self.add_step(CbaEnrichStep(cleanup=cleanup))
226         elif settings.EXPOSE_SERVICES_NODE_PORTS:
227             self.add_step(ExposeCDSBlueprintprocessorNodePortStep(cleanup=cleanup))
228
229     @property
230     def description(self) -> str:
231         """Step description."""
232         return "Publish CBA file."
233
234     @BaseStep.store_state
235     def execute(self) -> None:
236         """Publish CBA file.
237
238         Use settings values:
239          - CDS_CBA_ENRICHED.
240
241         """
242         super().execute()
243         blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_ENRICHED)
244         blueprint.publish()
245
246
247 class CbaProcessStep(CDSBaseStep):
248     """Process CBA step."""
249
250     def __init__(self, cleanup=False) -> None:
251         """Initialize CBA process step."""
252         super().__init__(cleanup=cleanup)
253         self.add_step(CbaPublishStep(cleanup=cleanup))
254
255     @property
256     def description(self) -> str:
257         """Step description."""
258         return "Process CBA file."
259
260     @BaseStep.store_state
261     def execute(self) -> None:
262         """Process CBA file.
263
264         Check if output is equal to expected
265
266         Use settings values:
267          - CDS_CBA_ENRICHED,
268          - CDS_WORKFLOW_NAME,
269          - CDS_WORKFLOW_INPUT
270
271         """
272         super().execute()
273         blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_ENRICHED)
274         workflow: Workflow = blueprint.get_workflow_by_name(settings.CDS_WORKFLOW_NAME)
275         output: Dict[str, Any] = workflow.execute(settings.CDS_WORKFLOW_INPUT)
276         if not output == settings.CDS_WORKFLOW_EXPECTED_OUTPUT:
277             raise OnapTestException("Response is not equal to the expected one")