Merge "Add new json events to PRH CSIT"
[integration/csit.git] / tests / dcaegen2 / prh-testcases / resources / PrhLibrary.py
1 import json
2
3 import docker
4 import time
5
6
7 class PrhLibrary(object):
8
9     def __init__(self):
10         pass
11
12     @staticmethod
13     def check_for_log(search_for):
14         client = docker.from_env()
15         container = client.containers.get('prh')
16         #print ("Check for log searches for pattern: ", search_for )
17         for line in container.logs(stream=True):
18             #print ("Check for log analysis line: ", line )
19             if search_for in line.strip():
20                 return True
21         else:
22             return False
23
24     @staticmethod
25     def create_pnf_ready_notification_from_ves(json_file):
26         json_to_python = json.loads(json_file)
27         ipv4 =  PrhLibrary.extract_ip_v4(json_to_python)
28         ipv6 = PrhLibrary.extract_ip_v6(json_to_python)
29         correlation_id = PrhLibrary.extract_correlation_id(json_to_python)
30         serial_number = PrhLibrary.extract_serial_number(json_to_python)
31         vendor_name = PrhLibrary.extract_vendor_name(json_to_python)
32         model_number = PrhLibrary.extract_model_number(json_to_python)
33         unit_type = PrhLibrary.extract_unit_type(json_to_python)
34
35         str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serialNumber":"' + serial_number + '","vendorName":"' + vendor_name  + '","modelNumber":"' + model_number + '","unitType":"' + unit_type + '"}'
36         python_to_json = json.dumps(str_json)
37         return python_to_json.replace("\\", "")[1:-1]
38
39     @staticmethod
40     def create_pnf_ready_notification_as_pnf_ready(json_file):
41         json_to_python = json.loads(json_file)
42         ipv4 =  PrhLibrary.extract_ip_v4(json_to_python)
43         ipv6 = PrhLibrary.extract_ip_v6(json_to_python)
44         correlation_id = PrhLibrary.extract_correlation_id(json_to_python)
45         serial_number = PrhLibrary.extract_serial_number(json_to_python)
46         vendor_name = PrhLibrary.extract_vendor_name(json_to_python)
47         model_number = PrhLibrary.extract_model_number(json_to_python)
48         unit_type = PrhLibrary.extract_unit_type(json_to_python)
49
50         nf_role  = json_to_python.get("event").get("commonEventHeader").get("nfNamingCode") if "nfNamingCode" in json_to_python["event"]["commonEventHeader"] else ""
51
52         str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + vendor_name  + '","equip-model":"' + model_number + '","equip-type":"' + unit_type + '","nf-role":"' + nf_role + '","sw-version":""}'
53         python_to_json = json.dumps(str_json)
54         return python_to_json.replace("\\", "")[1:-1]
55
56     @staticmethod
57     def extract_ip_v4(content):
58         return content.get("event").get("pnfRegistrationFields").get("oamV4IpAddress") if "oamV4IpAddress" in content["event"]["pnfRegistrationFields"] else ""
59
60     @staticmethod
61     def extract_ip_v6(content):
62         return content.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in content["event"]["pnfRegistrationFields"] else ""
63
64     @staticmethod
65     def extract_correlation_id(content):
66         return content.get("event").get("commonEventHeader").get("sourceName") if "sourceName" in content["event"]["commonEventHeader"] else ""
67
68     @staticmethod
69     def extract_serial_number(content):
70         return content.get("event").get("pnfRegistrationFields").get("serialNumber") if "serialNumber" in content["event"]["pnfRegistrationFields"] else ""
71
72     @staticmethod
73     def extract_vendor_name(content):
74         return content.get("event").get("pnfRegistrationFields").get("vendorName") if "vendorName" in content["event"]["pnfRegistrationFields"] else ""
75
76     @staticmethod
77     def extract_model_number(content):
78         return content.get("event").get("pnfRegistrationFields").get("modelNumber") if "modelNumber" in content["event"]["pnfRegistrationFields"] else ""
79
80     @staticmethod
81     def extract_unit_type(content):
82         return content.get("event").get("pnfRegistrationFields").get("unitType") if "unitType" in content["event"]["pnfRegistrationFields"] else ""
83
84     @staticmethod
85     def create_pnf_name(json_file):
86         json_to_python = json.loads(json_file)
87         correlation_id = json_to_python.get("sourceName")
88         return correlation_id
89
90     @staticmethod
91     def ensure_container_is_running(name):
92         client = docker.from_env()
93
94         if not PrhLibrary.is_in_status(client, name, "running"):
95             print ("starting container", name)
96             container = client.containers.get(name)
97             container.start()
98             PrhLibrary.wait_for_status(client, name, "running")
99
100         PrhLibrary.print_status(client)
101
102     @staticmethod
103     def ensure_container_is_exited(name):
104         client = docker.from_env()
105
106         if not PrhLibrary.is_in_status(client, name, "exited"):
107             print ("stopping container", name)
108             container = client.containers.get(name)
109             container.stop()
110             PrhLibrary.wait_for_status(client, name, "exited")
111
112         PrhLibrary.print_status(client)
113
114     @staticmethod
115     def print_status(client):
116         print("containers status")
117         for c in client.containers.list(all=True):
118             print(c.name, "   ", c.status)
119
120     @staticmethod
121     def wait_for_status(client, name, status):
122         while not PrhLibrary.is_in_status(client, name, status):
123             print ("waiting for container: ", name, "to be in status: ", status)
124             time.sleep(3)
125
126     @staticmethod
127     def is_in_status(client, name, status):
128         return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
129
130
131     def create_invalid_notification(self, json_file):
132         invalidate_input = self.create_pnf_ready_notification_from_ves(json_file).replace("\":", "\": ") \
133             .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam","oamV6IpAddress").replace("}",'')
134         invalidate_input__and_add_additional_fields = invalidate_input + ',"nfNamingCode": ""' + "," + '"softwareVersion": ""' +"\\n"
135         return invalidate_input__and_add_additional_fields