from onapsdk.cds import Blueprint, DataDictionarySet
from onapsdk.cds.blueprint_processor import Blueprintprocessor
from onapsdk.configuration import settings
+import urllib3
from ..base import BaseStep
+from onaptests.utils.exceptions import OnapTestException
class CDSBaseStep(BaseStep, ABC):
super().execute()
config.load_kube_config(settings.K8S_CONFIG)
self.k8s_client = client.CoreV1Api()
- self.k8s_client.patch_namespaced_service(
- self.service_name,
- settings.K8S_NAMESPACE,
- {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}}
- )
+ try:
+ self.k8s_client.patch_namespaced_service(
+ self.service_name,
+ settings.K8S_NAMESPACE,
+ {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}}
+ )
+ except urllib3.exceptions.HTTPError:
+ self._logger.exception("Can't connect with k8s")
+ raise OnapTestException
def cleanup(self) -> None:
"""Step cleanup.
Restore CDS blueprintprocessor service.
"""
- self.k8s_client.patch_namespaced_service(
- self.service_name,
- settings.K8S_NAMESPACE,
- [
- {
- "op": "remove",
- "path": "/spec/ports/0/nodePort"
- },
- {
- "op": "replace",
- "path": "/spec/type",
- "value": "ClusterIP"
- }
- ]
- )
- return super().cleanup()
+ try:
+ self.k8s_client.patch_namespaced_service(
+ self.service_name,
+ settings.K8S_NAMESPACE,
+ [
+ {
+ "op": "remove",
+ "path": "/spec/ports/0/nodePort"
+ },
+ {
+ "op": "replace",
+ "path": "/spec/type",
+ "value": "ClusterIP"
+ }
+ ]
+ )
+ return super().cleanup()
+ except urllib3.exceptions.HTTPError:
+ self._logger.exception("Can't connect with k8s")
+ raise OnapTestException
class BootstrapBlueprintprocessor(CDSBaseStep):
import requests
from kubernetes import client, config, watch
from onapsdk.configuration import settings
+import urllib3
from onaptests.steps.base import BaseStep
from onaptests.steps.instantiate.msb_k8s import CreateInstanceStep
"""Component name."""
return "Environment"
- @staticmethod
- def is_pnf_pod_running(timeout_seconds=120) -> bool:
+ def is_pnf_pod_running(self, timeout_seconds=120) -> bool:
"""Check if PNF simulator pod is running.
Args:
config.load_kube_config(settings.K8S_CONFIG)
k8s_client: "CoreV1API" = client.CoreV1Api()
k8s_watch: "Watch" = watch.Watch()
- for event in k8s_watch.stream(k8s_client.list_namespaced_pod,
- namespace=settings.K8S_NAMESPACE,
- timeout_seconds=timeout_seconds):
- if event["object"].metadata.name == "pnf-simulator":
- if not event["object"].status.phase in ["Pending", "Running"]:
- # Invalid pod state
- return False
- return event["object"].status.phase == "Running"
- return False
-
- @staticmethod
- def get_ves_ip_and_port() -> Tuple[str, str]:
+ try:
+ for event in k8s_watch.stream(k8s_client.list_namespaced_pod,
+ namespace=settings.K8S_NAMESPACE,
+ timeout_seconds=timeout_seconds):
+ if event["object"].metadata.name == "pnf-simulator":
+ if not event["object"].status.phase in ["Pending", "Running"]:
+ # Invalid pod state
+ return False
+ return event["object"].status.phase == "Running"
+ return False
+ except urllib3.exceptions.HTTPError:
+ self._logger.error("Can't connect with k8s")
+ raise OnapTestException
+
+ def get_ves_ip_and_port(self) -> Tuple[str, str]:
"""Static method to get VES ip address and port.
Raises:
"""
config.load_kube_config(settings.K8S_CONFIG)
k8s_client: "CoreV1API" = client.CoreV1Api()
- for service in k8s_client.list_namespaced_service(namespace=settings.K8S_NAMESPACE).items:
- if service.metadata.name == "xdcae-ves-collector":
- return service.spec.cluster_ip, service.spec.ports[0].port
- raise EnvironmentPreparationException("Couldn't get VES ip and port")
+ try:
+ for service in k8s_client.list_namespaced_service(namespace=settings.K8S_NAMESPACE).items:
+ if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME:
+ return service.spec.cluster_ip, service.spec.ports[0].port
+ raise EnvironmentPreparationException("Couldn't get VES ip and port")
+ except urllib3.exceptions.HTTPError:
+ self._logger.error("Can't connect with k8s")
+ raise OnapTestException
@BaseStep.store_state
def execute(self) -> None: