PRH test fixes - align simulated Dmaap response to the real one
[integration/csit.git] / tests / dcaegen2 / prh-testcases / resources / PrhLibrary.py
1 import json
2 import re
3 import docker
4 import time
5
6
7 class PrhLibrary(object):
8
9     def __init__(self):
10         pass
11
12     @staticmethod
13     def find_one_of_log_entryies(searched_entries):
14         print(type(searched_entries))
15         client = docker.from_env()
16         container = client.containers.get('prh')
17         print("Check for log searches for pattern: ", searched_entries)
18         for line in container.logs(stream=True):
19             print("Check for log analysis line: ", line )
20             for searched_entry in searched_entries:
21                 if searched_entry in line.strip():
22                     return True
23         else:
24             return False
25
26     @staticmethod
27     def find_log_json(prefix, json_message):
28         print("Looking for:")
29         print("Prefix: " + str(prefix))
30         print("Json: " + str(json_message))
31         try:
32             decoded_message = json.loads(json_message)
33         except json.JSONDecodeError:
34             print("Could not decode given message")
35             return False
36         pattern = re.compile(prefix + "(.*)$")
37         client = docker.from_env()
38         container = client.containers.get('prh')
39         for line in container.logs(stream=True):
40             print("Check for log analysis line: ", line )
41             if PrhLibrary.__same_json_in_log(decoded_message, line, pattern):
42                 return True
43         else:
44             return False
45
46     @staticmethod
47     def create_invalid_notification(json_file):
48         output = {}
49         input = json.loads(json_file)
50         output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
51         output["oamV4IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV4IpAddress")
52         output["oamV6IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV6IpAddress")
53         output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
54         output["vendorName"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
55         output["modelNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
56         output["unitType"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
57         output['nfNamingCode'] = ''
58         output['softwareVersion'] = ''
59
60         output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
61
62         return json.dumps(output)
63
64     @staticmethod
65     def create_pnf_ready_notification_as_pnf_ready(json_file):
66         output = {}
67         input = json.loads(json_file)[0]
68
69         output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
70         output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
71         output["equip-vendor"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
72         output["equip-model"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
73         output["equip-type"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
74         output["nf-role"] = PrhLibrary.__extract_nf_role(input)
75         output["sw-version"] = ""
76
77         output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
78
79         return json.dumps(output)
80
81     @staticmethod
82     def ensure_container_is_running(name):
83         client = docker.from_env()
84
85         if not PrhLibrary.is_in_status(client, name, "running"):
86             print ("starting container", name)
87             container = client.containers.get(name)
88             container.start()
89             PrhLibrary.wait_for_status(client, name, "running")
90
91         PrhLibrary.print_status(client)
92
93     @staticmethod
94     def ensure_container_is_exited(name):
95         client = docker.from_env()
96
97         if not PrhLibrary.is_in_status(client, name, "exited"):
98             print ("stopping container", name)
99             container = client.containers.get(name)
100             container.stop()
101             PrhLibrary.wait_for_status(client, name, "exited")
102
103         PrhLibrary.print_status(client)
104
105     @staticmethod
106     def print_status(client):
107         print("containers status")
108         for c in client.containers.list(all=True):
109             print(c.name, "   ", c.status)
110
111     @staticmethod
112     def wait_for_status(client, name, status):
113         while not PrhLibrary.is_in_status(client, name, status):
114             print ("waiting for container: ", name, "to be in status: ", status)
115             time.sleep(3)
116
117     @staticmethod
118     def is_in_status(client, name, status):
119         return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
120
121     @staticmethod
122     def create_pnf_name(json_file):
123         json_to_python = json.loads(json_file)
124         correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
125         return correlation_id
126
127     @staticmethod
128     def __get_additional_fields_as_key_value_pairs(content):
129         return content.get("event").get("pnfRegistrationFields").get(
130             "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else {}
131
132     @staticmethod
133     def __extract_value_from_pnfRegistrationFields(content, key):
134         return content["event"]["pnfRegistrationFields"][key] if key in content["event"]["pnfRegistrationFields"] else ''
135
136     @staticmethod
137     def __extract_correlation_id_value(content):
138         return content["event"]["commonEventHeader"]["sourceName"] if "sourceName" in content["event"]["commonEventHeader"] else ''
139
140     @staticmethod
141     def __extract_nf_role(content):
142         return content["event"]["commonEventHeader"]["nfNamingCode"] if "nfNamingCode" in content["event"]["commonEventHeader"] else ''
143
144     @staticmethod
145     def __same_json_in_log(decoded_message, line, pattern):
146         extracted_json = PrhLibrary.__extract_json(line, pattern)
147         if extracted_json is not None:
148             print("Found json: " + extracted_json)
149             try:
150                 if json.loads(extracted_json) == decoded_message:
151                     return True
152             except json.JSONDecodeError:
153                 print("Could not decode")
154         return False
155
156     @staticmethod
157     def __extract_json(line, pattern):
158         full_message = PrhLibrary.__extract_full_message_from_line(line)
159         if full_message is not None:
160             match = pattern.match(full_message)
161             if match:
162                 return match.group(1).replace("\\n", "\n").replace("\\t", "\t")
163         return None
164
165     @staticmethod
166     def __extract_full_message_from_line(line):
167         split = line.split("|")
168         if len(split) > 3:
169             return split[3]
170         return None