1 """Connectivity info creation module."""
2 from jinja2 import Environment, PackageLoader, select_autoescape
4 from onapsdk.configuration import settings
5 from onapsdk.exceptions import APIError
6 from onapsdk.k8s import ConnectivityInfo
7 from onapsdk.utils.jinja import jinja_env
8 from onaptests.steps.base import BaseStep
11 class K8SConnectivityInfoStep(BaseStep):
12 """CreateConnnectivityInfoStep."""
15 def description(self) -> str:
16 """Step description."""
17 return "Create K8S connectivity info."
20 def component(self) -> str:
26 """Creation k8s connectivity information.
30 - CLOUD_REGION_CLOUD_OWNER,
34 ######## Create Connectivity Info #########################################
36 self._logger.info("Check if k8s connectivity information exists")
37 ConnectivityInfo.get_connectivity_info_by_region_id(
38 settings.CLOUD_REGION_ID)
40 if settings.IN_CLUSTER:
41 token_file = "/var/run/secrets/kubernetes.io/serviceaccount/token"
42 with open(token_file, "r") as file:
43 user_token_value = file.read().strip()
44 jinja_env = Environment(autoescape=select_autoescape(['json.j2']),
45 loader=PackageLoader('onaptests.templates', 'kubeconfig'))
46 kubeconfig_data = jinja_env.get_template("kube_config.json.j2").render(
47 user_token_value=user_token_value
49 # Create the k8s connectivity information with the kubeconfig data
50 self._logger.info("Create the k8s connectivity information")
51 ConnectivityInfo.create(settings.CLOUD_REGION_ID,
52 settings.CLOUD_REGION_CLOUD_OWNER,
53 kubeconfig_data.encode('utf-8'))
55 self._logger.info("Create the k8s connectivity information")
56 ConnectivityInfo.create(settings.CLOUD_REGION_ID,
57 settings.CLOUD_REGION_CLOUD_OWNER,
58 open(settings.K8S_CONFIG, 'rb').read())
60 @BaseStep.store_state(cleanup=True)
61 def cleanup(self) -> None:
62 """Cleanup K8S Connectivity information."""
63 self._logger.info("Clean the k8s connectivity information")
64 connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(
65 settings.CLOUD_REGION_ID)