Modify Python ONAP SDK tests to not require kubeconfig
[testsuite/pythonsdk-tests.git] / src / onaptests / steps / cloud / k8s_connectivity_info_create.py
1 """Connectivity info creation module."""
2 from jinja2 import Environment, PackageLoader, select_autoescape
3
4 from onapsdk.configuration import settings
5 from onapsdk.exceptions import APIError
6 from onapsdk.k8s import ConnectivityInfo
7 from onapsdk.utils.jinja import jinja_env
8 from onaptests.steps.base import BaseStep
9
10
11 class K8SConnectivityInfoStep(BaseStep):
12     """CreateConnnectivityInfoStep."""
13
14     @property
15     def description(self) -> str:
16         """Step description."""
17         return "Create K8S connectivity info."
18
19     @property
20     def component(self) -> str:
21         """Component name."""
22         return "K8S plugin"
23
24     @BaseStep.store_state
25     def execute(self):
26         """Creation k8s connectivity information.
27
28         Use settings values:
29          - CLOUD_REGION_ID,
30          - CLOUD_REGION_CLOUD_OWNER,
31          - K8S_CONFIG.
32         """
33         super().execute()
34         ######## Create Connectivity Info #########################################
35         try:
36             self._logger.info("Check if k8s connectivity information exists")
37             ConnectivityInfo.get_connectivity_info_by_region_id(
38                 settings.CLOUD_REGION_ID)
39         except APIError:
40             if settings.IN_CLUSTER:
41                 token_file = "/var/run/secrets/kubernetes.io/serviceaccount/token"
42                 with open(token_file, "r") as file:
43                     user_token_value = file.read().strip()
44                 jinja_env = Environment(autoescape=select_autoescape(['json.j2']),
45                                         loader=PackageLoader('onaptests.templates', 'kubeconfig'))
46                 kubeconfig_data = jinja_env.get_template("kube_config.json.j2").render(
47                     user_token_value=user_token_value
48                 )
49                 # Create the k8s connectivity information with the kubeconfig data
50                 self._logger.info("Create the k8s connectivity information")
51                 ConnectivityInfo.create(settings.CLOUD_REGION_ID,
52                                         settings.CLOUD_REGION_CLOUD_OWNER,
53                                         kubeconfig_data.encode('utf-8'))
54             else:
55                 self._logger.info("Create the k8s connectivity information")
56                 ConnectivityInfo.create(settings.CLOUD_REGION_ID,
57                                         settings.CLOUD_REGION_CLOUD_OWNER,
58                                         open(settings.K8S_CONFIG, 'rb').read())
59
60     @BaseStep.store_state(cleanup=True)
61     def cleanup(self) -> None:
62         """Cleanup K8S Connectivity information."""
63         self._logger.info("Clean the k8s connectivity information")
64         connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(
65             settings.CLOUD_REGION_ID)
66         connectinfo.delete()
67         super().cleanup()