Update expected log entries in PRH in case AAI has missing entry or is not reponding
[integration/csit.git] / tests / dcaegen2 / prh-testcases / resources / PrhLibrary.py
1 import json
2 import re
3 import docker
4 import time
5
6
7 class PrhLibrary(object):
8
9     def __init__(self):
10         pass
11
12     @staticmethod
13     def find_one_of_log_entryies(searched_entries):
14         print(type(searched_entries))
15         client = docker.from_env()
16         container = client.containers.get('prh')
17         print("Check for log searches for pattern: ", searched_entries)
18         for line in container.logs(stream=True):
19             print("Check for log analysis line: ", line )
20             for searched_entry in searched_entries:
21                 if searched_entry in line.strip():
22                     return True
23         else:
24             return False
25
26     @staticmethod
27     def find_log_json(prefix, json_message):
28         print("Looking for:")
29         print("Prefix: " + str(prefix))
30         print("Json: " + str(json_message))
31         try:
32             decoded_message = json.loads(json_message)
33         except json.JSONDecodeError:
34             print("Could not decode given message")
35             return False
36         pattern = re.compile(prefix + "(.*)$")
37         client = docker.from_env()
38         container = client.containers.get('prh')
39         for line in container.logs(stream=True):
40             print("Check for log analysis line: ", line )
41             if PrhLibrary.__same_json_in_log(decoded_message, line, pattern):
42                 return True
43         else:
44             return False
45
46     @staticmethod
47     def create_invalid_notification(json_file):
48         output = {}
49         input = json.loads(json_file)[0]
50
51         output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
52         output["oamV4IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV4IpAddress")
53         output["oamV6IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV6IpAddress")
54         output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
55         output["vendorName"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
56         output["modelNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
57         output["unitType"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
58         output['nfNamingCode'] = ''
59         output['softwareVersion'] = ''
60
61         output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
62
63         return json.dumps(output)
64
65     @staticmethod
66     def create_pnf_ready_notification_as_pnf_ready(json_file):
67         output = {}
68         input = json.loads(json_file)[0]
69
70         output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
71         output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
72         output["equip-vendor"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
73         output["equip-model"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
74         output["equip-type"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
75         output["nf-role"] = PrhLibrary.__extract_nf_role(input)
76         output["sw-version"] = ""
77
78         output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
79
80         return json.dumps(output)
81
82     @staticmethod
83     def ensure_container_is_running(name):
84         client = docker.from_env()
85
86         if not PrhLibrary.is_in_status(client, name, "running"):
87             print ("starting container", name)
88             container = client.containers.get(name)
89             container.start()
90             PrhLibrary.wait_for_status(client, name, "running")
91
92         PrhLibrary.print_status(client)
93
94     @staticmethod
95     def ensure_container_is_exited(name):
96         client = docker.from_env()
97
98         if not PrhLibrary.is_in_status(client, name, "exited"):
99             print ("stopping container", name)
100             container = client.containers.get(name)
101             container.stop()
102             PrhLibrary.wait_for_status(client, name, "exited")
103
104         PrhLibrary.print_status(client)
105
106     @staticmethod
107     def print_status(client):
108         print("containers status")
109         for c in client.containers.list(all=True):
110             print(c.name, "   ", c.status)
111
112     @staticmethod
113     def wait_for_status(client, name, status):
114         while not PrhLibrary.is_in_status(client, name, status):
115             print ("waiting for container: ", name, "to be in status: ", status)
116             time.sleep(3)
117
118     @staticmethod
119     def is_in_status(client, name, status):
120         return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
121
122     @staticmethod
123     def create_pnf_name(json_file):
124         json_to_python = json.loads(json_file)
125         correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
126         return correlation_id
127
128     @staticmethod
129     def __get_additional_fields_as_key_value_pairs(content):
130         return content.get("event").get("pnfRegistrationFields").get(
131             "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else {}
132
133     @staticmethod
134     def __extract_value_from_pnfRegistrationFields(content, key):
135         return content["event"]["pnfRegistrationFields"][key] if key in content["event"]["pnfRegistrationFields"] else ''
136
137     @staticmethod
138     def __extract_correlation_id_value(content):
139         return content["event"]["commonEventHeader"]["sourceName"] if "sourceName" in content["event"]["commonEventHeader"] else ''
140
141     @staticmethod
142     def __extract_nf_role(content):
143         return content["event"]["commonEventHeader"]["nfNamingCode"] if "nfNamingCode" in content["event"]["commonEventHeader"] else ''
144
145     @staticmethod
146     def __same_json_in_log(decoded_message, line, pattern):
147         extracted_json = PrhLibrary.__extract_json(line, pattern)
148         if extracted_json is not None:
149             print("Found json: " + extracted_json)
150             try:
151                 if json.loads(extracted_json) == decoded_message:
152                     return True
153             except json.JSONDecodeError:
154                 print("Could not decode")
155         return False
156
157     @staticmethod
158     def __extract_json(line, pattern):
159         full_message = PrhLibrary.__extract_full_message_from_line(line)
160         if full_message is not None:
161             match = pattern.match(full_message)
162             if match:
163                 return match.group(1).replace("\\n", "\n").replace("\\t", "\t")
164         return None
165
166     @staticmethod
167     def __extract_full_message_from_line(line):
168         split = line.split("|")
169         if len(split) > 3:
170             return split[3]
171         return None