Add new json events to PRH CSIT
[integration/csit.git] / tests / dcaegen2 / prh-testcases / resources / PrhLibrary.py
1 import json
2
3 import docker
4 import time
5
6
7 class PrhLibrary(object):
8
9     def __init__(self):
10         pass
11
12     @staticmethod
13     def check_for_log(search_for):
14         client = docker.from_env()
15         container = client.containers.get('prh')
16         for line in container.logs(stream=True):
17             if search_for in line.strip():
18                 return True
19         else:
20             return False
21
22     @staticmethod
23     def create_pnf_ready_notification(json_file):
24         json_to_python = json.loads(json_file)
25         ipv4 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV4IpAddress")
26         ipv6 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in json_to_python["event"]["pnfRegistrationFields"] else ""
27         serial_number = json_to_python.get("event").get("pnfRegistrationFields").get("serial-number") if "serial-number" in json_to_python["event"]["pnfRegistrationFields"] else ""
28         equip_vendor = json_to_python.get("event").get("pnfRegistrationFields").get("equip-vendor") if "equip-vendor" in json_to_python["event"]["pnfRegistrationFields"] else ""
29         equip_model = json_to_python.get("event").get("pnfRegistrationFields").get("equip-model") if "equip-model" in json_to_python["event"]["pnfRegistrationFields"] else ""
30         equip_type = json_to_python.get("event").get("pnfRegistrationFields").get("equip-type") if "equip-type" in json_to_python["event"]["pnfRegistrationFields"] else ""
31         nf_role = json_to_python.get("event").get("pnfRegistrationFields").get("nf-role") if "nf-role" in json_to_python["event"]["pnfRegistrationFields"] else ""
32         sw_version = json_to_python.get("event").get("pnfRegistrationFields").get("sw-version") if "sw-version" in json_to_python["event"]["pnfRegistrationFields"] else ""
33         correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
34         str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + equip_vendor + '","equip-model":"' + equip_model + '","equip-type":"' + equip_type + '","nf-role":"' + nf_role + '","sw-version":"' + sw_version + '"}'
35         python_to_json = json.dumps(str_json)
36         return python_to_json.replace("\\", "")[1:-1]
37
38     @staticmethod
39     def create_pnf_name(json_file):
40         json_to_python = json.loads(json_file)
41         correlation_id = json_to_python.get("sourceName")
42         return correlation_id
43
44     @staticmethod
45     def ensure_container_is_running(name):
46         client = docker.from_env()
47
48         if not PrhLibrary.is_in_status(client, name, "running"):
49             print ("starting container", name)
50             container = client.containers.get(name)
51             container.start()
52             PrhLibrary.wait_for_status(client, name, "running")
53
54         PrhLibrary.print_status(client)
55
56     @staticmethod
57     def ensure_container_is_exited(name):
58         client = docker.from_env()
59
60         if not PrhLibrary.is_in_status(client, name, "exited"):
61             print ("stopping container", name)
62             container = client.containers.get(name)
63             container.stop()
64             PrhLibrary.wait_for_status(client, name, "exited")
65
66         PrhLibrary.print_status(client)
67
68     @staticmethod
69     def print_status(client):
70         print("containers status")
71         for c in client.containers.list(all=True):
72             print(c.name, "   ", c.status)
73
74     @staticmethod
75     def wait_for_status(client, name, status):
76         while not PrhLibrary.is_in_status(client, name, status):
77             print ("waiting for container: ", name, "to be in status: ", status)
78             time.sleep(3)
79
80     @staticmethod
81     def is_in_status(client, name, status):
82         return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
83
84
85     def create_invalid_notification(self, json_file):
86         return self.create_pnf_ready_notification(json_file).replace("\":", "\": ")\
87             .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam", "oamV6IpAddress")\
88             .replace("}", "\\n}")