Fix PRH CSITs with additionalFields
[integration/csit.git] / tests / dcaegen2 / prh-testcases / resources / PrhLibrary.py
1 import json
2
3 import docker
4 import time
5
6
7 class PrhLibrary(object):
8
9     def __init__(self):
10         pass
11
12     @staticmethod
13     def check_for_log(search_for):
14         client = docker.from_env()
15         container = client.containers.get('prh')
16         print ("Check for log searches for pattern: ", search_for )
17         for line in container.logs(stream=True):
18             print ("Check for log analysis line: ", line )
19             if search_for in line.strip():
20                 return True
21         else:
22             return False
23
24     @staticmethod
25     def create_invalid_notification(json_file):
26         json_to_python = json.loads(json_file)
27         correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
28         ipv4 = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "oamV4IpAddress", "oamV4IpAddress")
29         ipv6 = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "oamV6IpAddress", "oamV6IpAddress")
30         serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "serialNumber", "serialNumber")
31         vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "vendorName", "vendorName")
32         model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "modelNumber", "modelNumber")
33         unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "unitType", "unitType")
34         additional_fields = PrhLibrary.extract_additional_fields(json_to_python, "additionalFields")
35
36         str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields
37         return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}'
38
39     @staticmethod
40     def create_pnf_ready_notification_as_pnf_ready(json_file):
41         json_to_python = json.loads(json_file)
42         correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
43         serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "serial-number", "serialNumber")
44         vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "equip-vendor", "vendorName")
45         model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "equip-model", "modelNumber")
46         unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "equip-type", "unitType")
47         additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python, "additionalFields")
48
49         nf_role  = json_to_python.get("event").get("commonEventHeader").get("nfNamingCode") if "nfNamingCode" in json_to_python["event"]["commonEventHeader"] else ""
50
51         str_json = '{' + correlation_id + serial_number + vendor_name + model_number + unit_type + '"nf-role":"' + nf_role + '","sw-version":"",' + additional_fields
52
53         return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1]
54
55     @staticmethod
56     def extract_additional_fields(content, name):
57         fields = content.get("event").get("pnfRegistrationFields").get(name) if name in content["event"]["pnfRegistrationFields"] else []
58         if fields == []:
59             return '"additionalFields":' + 'null'
60         res = '"' + name + '":{'
61         for f in fields:
62             res += '"' + f + '"' + ':' + '"' + fields.get(f) + '",'
63         return res.rstrip(',') + '},'
64
65     @staticmethod
66     def extract_additional_fields_value(content, name):
67         fields = content.get("event").get("pnfRegistrationFields").get(name) if name in content["event"]["pnfRegistrationFields"] else []
68         if fields == [] or len(fields) == 0:
69             return ""
70         res = '"' + name + '":{'
71         for f in fields:
72             res += '"' + f + '"' + ':' + '"' + fields.get(f) + '",'
73         return res.rstrip(',') + '},'
74
75     @staticmethod
76     def extract_value_from_pnfRegistrationFields(content, name, key):
77         return '"' + name + '":"' + (content.get("event").get("pnfRegistrationFields").get(key) + '",' if key in content["event"]["pnfRegistrationFields"] else '",')
78
79     @staticmethod
80     def extract_correlation_id_value(content, name):
81         return '"' + name + '":"' + (content.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in content["event"]["commonEventHeader"] else '",')
82
83     @staticmethod
84     def create_pnf_name(json_file):
85         json_to_python = json.loads(json_file)
86         correlation_id = json_to_python.get("sourceName")
87         return correlation_id
88
89     @staticmethod
90     def ensure_container_is_running(name):
91         client = docker.from_env()
92
93         if not PrhLibrary.is_in_status(client, name, "running"):
94             print ("starting container", name)
95             container = client.containers.get(name)
96             container.start()
97             PrhLibrary.wait_for_status(client, name, "running")
98
99         PrhLibrary.print_status(client)
100
101     @staticmethod
102     def ensure_container_is_exited(name):
103         client = docker.from_env()
104
105         if not PrhLibrary.is_in_status(client, name, "exited"):
106             print ("stopping container", name)
107             container = client.containers.get(name)
108             container.stop()
109             PrhLibrary.wait_for_status(client, name, "exited")
110
111         PrhLibrary.print_status(client)
112
113     @staticmethod
114     def print_status(client):
115         print("containers status")
116         for c in client.containers.list(all=True):
117             print(c.name, "   ", c.status)
118
119     @staticmethod
120     def wait_for_status(client, name, status):
121         while not PrhLibrary.is_in_status(client, name, status):
122             print ("waiting for container: ", name, "to be in status: ", status)
123             time.sleep(3)
124
125     @staticmethod
126     def is_in_status(client, name, status):
127         return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1