Modify Python ONAP SDK tests to not require kubeconfig
[testsuite/pythonsdk-tests.git] / src / onaptests / steps / cloud / k8s_connectivity_info_create.py
index dfcda05..8c361e4 100644 (file)
@@ -1,9 +1,12 @@
 """Connectivity info creation module."""
+from jinja2 import Environment, PackageLoader, select_autoescape
+
 from onapsdk.configuration import settings
 from onapsdk.exceptions import APIError
 from onapsdk.k8s import ConnectivityInfo
+from onapsdk.utils.jinja import jinja_env
+from onaptests.steps.base import BaseStep
 
-from ..base import BaseStep
 
 class K8SConnectivityInfoStep(BaseStep):
     """CreateConnnectivityInfoStep."""
@@ -31,17 +34,34 @@ class K8SConnectivityInfoStep(BaseStep):
         ######## Create Connectivity Info #########################################
         try:
             self._logger.info("Check if k8s connectivity information exists")
-            ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
+            ConnectivityInfo.get_connectivity_info_by_region_id(
+                settings.CLOUD_REGION_ID)
         except APIError:
-            self._logger.info("Create the k8s connectivity information")
-            ConnectivityInfo.create(settings.CLOUD_REGION_ID,
-                                    settings.CLOUD_REGION_CLOUD_OWNER,
-                                    open(settings.K8S_CONFIG, 'rb').read())
+            if settings.IN_CLUSTER:
+                token_file = "/var/run/secrets/kubernetes.io/serviceaccount/token"
+                with open(token_file, "r") as file:
+                    user_token_value = file.read().strip()
+                jinja_env = Environment(autoescape=select_autoescape(['json.j2']),
+                                        loader=PackageLoader('onaptests.templates', 'kubeconfig'))
+                kubeconfig_data = jinja_env.get_template("kube_config.json.j2").render(
+                    user_token_value=user_token_value
+                )
+                # Create the k8s connectivity information with the kubeconfig data
+                self._logger.info("Create the k8s connectivity information")
+                ConnectivityInfo.create(settings.CLOUD_REGION_ID,
+                                        settings.CLOUD_REGION_CLOUD_OWNER,
+                                        kubeconfig_data.encode('utf-8'))
+            else:
+                self._logger.info("Create the k8s connectivity information")
+                ConnectivityInfo.create(settings.CLOUD_REGION_ID,
+                                        settings.CLOUD_REGION_CLOUD_OWNER,
+                                        open(settings.K8S_CONFIG, 'rb').read())
 
     @BaseStep.store_state(cleanup=True)
     def cleanup(self) -> None:
         """Cleanup K8S Connectivity information."""
         self._logger.info("Clean the k8s connectivity information")
-        connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
+        connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(
+            settings.CLOUD_REGION_ID)
         connectinfo.delete()
         super().cleanup()