1 # http://www.apache.org/licenses/LICENSE-2.0
2 """PNF simulator registration module."""
5 from typing import Tuple
8 from kubernetes import client, config, watch
9 from onapsdk.configuration import settings
12 from onaptests.steps.base import BaseStep
13 from onaptests.steps.instantiate.msb_k8s import CreateInstanceStep
14 from onaptests.utils.exceptions import EnvironmentPreparationException, OnapTestException
17 class PnfSimulatorCnfRegisterStep(BaseStep):
18 """PNF simulator registration step."""
20 def __init__(self, cleanup: bool = False) -> None:
26 super().__init__(cleanup=cleanup)
27 self.add_step(CreateInstanceStep(cleanup=cleanup))
30 def description(self) -> str:
31 """Step description."""
32 return "Register PNF simulator with VES."
35 def component(self) -> str:
39 def is_pnf_pod_running(self, timeout_seconds=120) -> bool:
40 """Check if PNF simulator pod is running.
43 timeout_seconds (int, optional): Timeout. Defaults to 120.
46 bool: True if PNF simulator pod is running, False otherwise
49 if settings.IN_CLUSTER:
50 config.load_incluster_config()
52 config.load_kube_config(config_file=settings.K8S_CONFIG)
53 k8s_client: "CoreV1API" = client.CoreV1Api()
54 k8s_watch: "Watch" = watch.Watch()
57 for event in k8s_watch.stream(k8s_client.list_namespaced_pod,
58 namespace=settings.K8S_ADDITIONAL_RESOURCES_NAMESPACE,
59 timeout_seconds=timeout_seconds):
60 if event["object"].metadata.name == "pnf-macro-test-simulator":
61 if not event["object"].status.phase in ["Pending", "Running"]:
64 if event["object"].status.phase == "Running":
67 except urllib3.exceptions.HTTPError:
68 self._logger.error("Can't connect with k8s")
69 raise OnapTestException
71 def get_ves_protocol_ip_and_port(self) -> Tuple[str, str, str]:
72 """Static method to get VES protocol, ip address and port.
75 EnvironmentPreparationException: VES pod is not running
78 Tuple[str, str, str]: VES protocol, IP and port
81 if settings.IN_CLUSTER:
82 config.load_incluster_config()
84 config.load_kube_config(config_file=settings.K8S_CONFIG)
85 k8s_client: "CoreV1API" = client.CoreV1Api()
87 for service in k8s_client.list_namespaced_service(namespace=settings.K8S_ONAP_NAMESPACE).items:
88 if service.metadata.name == settings.DCAE_VES_COLLECTOR_POD_NAME:
90 if "443" in str(service.spec.ports[0].port):
92 return proto, service.spec.cluster_ip, service.spec.ports[0].port
93 raise EnvironmentPreparationException("Couldn't get VES protocol, ip and port")
95 self._logger.exception("Can't connect with k8s")
96 raise OnapTestException
99 def execute(self) -> None:
100 """Send PNF registration event."""
102 status = self.is_pnf_pod_running()
104 raise EnvironmentPreparationException("PNF simulator is not running")
105 time.sleep(settings.PNF_WAIT_TIME) # Let's still wait for PNF simulator to make sure it's initialized
106 ves_proto, ves_ip, ves_port = self.get_ves_protocol_ip_and_port()
107 registration_number: int = 0
108 registered_successfully: bool = False
109 while registration_number < settings.PNF_REGISTRATION_NUMBER_OF_TRIES and not registered_successfully:
111 response = requests.post(
112 "http://portal.api.simpledemo.onap.org:30999/simulator/start",
116 "repeatInterval": 30,
117 "vesServerUrl": f"{ves_proto}://sample1:sample1@{ves_ip}:{ves_port}/eventListener/v7"
119 "templateName": "registration.json",
122 "commonEventHeader": {
123 "sourceName": settings.SERVICE_INSTANCE_NAME
125 "pnfRegistrationFields": {
126 "oamV4IpAddress": "192.168.0.1",
127 "oamV6IpAddress": "2001:db8::1428:57ab"
133 response.raise_for_status()
134 registered_successfully = True
135 self._logger.info(f"PNF registered with {settings.SERVICE_INSTANCE_NAME} source name")
136 except (requests.ConnectionError, requests.HTTPError) as http_error:
137 self._logger.debug(f"Can't connect with PNF simulator: {str(http_error)}")
138 registration_number = registration_number + 1
139 time.sleep(settings.PNF_WAIT_TIME)
140 if not registered_successfully:
141 raise OnapTestException("PNF not registered successfully")