"""Connectivity info creation module."""
+from jinja2 import Environment, PackageLoader, select_autoescape
+
from onapsdk.configuration import settings
from onapsdk.exceptions import APIError
from onapsdk.k8s import ConnectivityInfo
+from onapsdk.utils.jinja import jinja_env
+from onaptests.steps.base import BaseStep
-from ..base import BaseStep
class K8SConnectivityInfoStep(BaseStep):
"""CreateConnnectivityInfoStep."""
######## Create Connectivity Info #########################################
try:
self._logger.info("Check if k8s connectivity information exists")
- ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
+ ConnectivityInfo.get_connectivity_info_by_region_id(
+ settings.CLOUD_REGION_ID)
except APIError:
- self._logger.info("Create the k8s connectivity information")
- ConnectivityInfo.create(settings.CLOUD_REGION_ID,
- settings.CLOUD_REGION_CLOUD_OWNER,
- open(settings.K8S_CONFIG, 'rb').read())
+ if settings.IN_CLUSTER:
+ token_file = "/var/run/secrets/kubernetes.io/serviceaccount/token"
+ with open(token_file, "r") as file:
+ user_token_value = file.read().strip()
+ jinja_env = Environment(autoescape=select_autoescape(['json.j2']),
+ loader=PackageLoader('onaptests.templates', 'kubeconfig'))
+ kubeconfig_data = jinja_env.get_template("kube_config.json.j2").render(
+ user_token_value=user_token_value
+ )
+ # Create the k8s connectivity information with the kubeconfig data
+ self._logger.info("Create the k8s connectivity information")
+ ConnectivityInfo.create(settings.CLOUD_REGION_ID,
+ settings.CLOUD_REGION_CLOUD_OWNER,
+ kubeconfig_data.encode('utf-8'))
+ else:
+ self._logger.info("Create the k8s connectivity information")
+ ConnectivityInfo.create(settings.CLOUD_REGION_ID,
+ settings.CLOUD_REGION_CLOUD_OWNER,
+ open(settings.K8S_CONFIG, 'rb').read())
@BaseStep.store_state(cleanup=True)
def cleanup(self) -> None:
"""Cleanup K8S Connectivity information."""
self._logger.info("Clean the k8s connectivity information")
- connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(settings.CLOUD_REGION_ID)
+ connectinfo = ConnectivityInfo.get_connectivity_info_by_region_id(
+ settings.CLOUD_REGION_ID)
connectinfo.delete()
super().cleanup()
bool: True if PNF simulator pod is running, False otherwise
"""
- config.load_kube_config(settings.K8S_CONFIG)
+ if settings.IN_CLUSTER:
+ config.load_incluster_config()
+ else:
+ config.load_kube_config(config_file=settings.K8S_CONFIG)
k8s_client: "CoreV1API" = client.CoreV1Api()
k8s_watch: "Watch" = watch.Watch()
status = False
Tuple[str, str, str]: VES protocol, IP and port
"""
- config.load_kube_config(settings.K8S_CONFIG)
+ if settings.IN_CLUSTER:
+ config.load_incluster_config()
+ else:
+ config.load_kube_config(config_file=settings.K8S_CONFIG)
k8s_client: "CoreV1API" = client.CoreV1Api()
try:
for service in k8s_client.list_namespaced_service(namespace=settings.K8S_ONAP_NAMESPACE).items: