1 from onapsdk.configuration import settings
2 from onapsdk.msb.k8s import ConnectivityInfo
4 from ..base import BaseStep
6 class K8SConnectivityInfoStep(BaseStep):
7 """CreateConnnectivityInfoStep."""
11 """Creation k8s connectivity information
15 - CLOUD_REGION_CLOUD_OWNER,
16 - K8S_KUBECONFIG_FILE.
19 ######## Create Connectivity Info #########################################
21 self._logger.info("Check if k8s connectivity information exists")
22 ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
24 self._logger.info("Create the k8s connectivity information")
25 ConnectivityInfo.create(settings.CLOUD_REGION_ID,
26 settings.CLOUD_REGION_CLOUD_OWNER,
27 open(settings.K8S_KUBECONFIG_FILE, 'rb').read())
29 def cleanup(self) -> None:
30 """Cleanup K8S Connectivity information.
32 self._logger.info("Clean the k8s connectivity information")
34 connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)