d1992283b0efd65130f403a642edf9e2e9ca0b68
[integration/csit.git] / tests / dcaegen2 / prh-testcases / resources / PrhLibrary.py
1 import json
2 import re
3 import docker
4 import time
5 import datetime
6
7
8 class PrhLibrary(object):
9     ROBOT_LIBRARY_SCOPE = 'TEST SUITE'
10     ROBOT_LISTENER_API_VERSION = 2
11
12     def __init__(self):
13         self.ROBOT_LIBRARY_LISTENER = self
14
15     def _start_test(self, name, attrs):
16         # http://robotframework.org/robotframework/latest/RobotFrameworkUserGuide.html#test-libraries-as-listeners
17         self.test_start_time = self.get_current_utc_datetime()
18
19     @staticmethod
20     def get_current_utc_datetime():
21         return datetime.datetime.utcnow()
22
23     def get_docker_logs_since_test_start(self, container_id):
24         return self.get_docker_logs(container_id, self.test_start_time)
25
26     @staticmethod
27     def get_docker_logs(container_id, since=None):
28         container = PrhLibrary.__get_docker_container(container_id)
29         return container.logs(stream=False, since=since)
30
31     def wait_for_one_of_docker_log_entries(self, container_id, searched_entries):
32         print("Looking for: %s" % searched_entries)
33         container = PrhLibrary.__get_docker_container(container_id)
34         print("Log lines:")
35         for line in container.logs(stream=True, since=self.test_start_time):
36             print(line)
37             for searched_entry in searched_entries:
38                 if searched_entry in line.strip():
39                     return True
40         else:
41             return False
42
43     def wait_for_log_entry_with_json_message(self, prefix, json_message):
44         print("Looking for:")
45         print("Prefix: " + str(prefix))
46         print("Json: " + str(json_message))
47         try:
48             decoded_message = json.loads(json_message)
49         except json.JSONDecodeError:
50             print("Could not decode given message")
51             return False
52         pattern = re.compile(prefix + "(.*)$")
53         container = PrhLibrary.__get_docker_container('prh')
54         print("Log lines:")
55         for line in container.logs(stream=True, since=self.test_start_time):
56             print(line)
57             if PrhLibrary.__same_json_in_log(decoded_message, line, pattern):
58                 return True
59         else:
60             return False
61
62     @staticmethod
63     def create_invalid_notification(json_file):
64         output = {}
65         input = json.loads(json_file)
66         output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
67         output["oamV4IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV4IpAddress")
68         output["oamV6IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV6IpAddress")
69         output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
70         output["vendorName"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
71         output["modelNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
72         output["unitType"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
73         output['nfNamingCode'] = ''
74         output['softwareVersion'] = ''
75
76         output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
77
78         return json.dumps(output)
79
80     @staticmethod
81     def create_pnf_ready_notification_as_pnf_ready(json_file):
82         output = {}
83         input = json.loads(json_file)[0]
84
85         output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
86         output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
87         output["equip-vendor"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
88         output["equip-model"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
89         output["equip-type"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
90         output["nf-role"] = PrhLibrary.__extract_nf_role(input)
91         output["sw-version"] = ""
92
93         output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
94
95         return json.dumps(output)
96
97     @staticmethod
98     def ensure_container_is_running(name):
99         client = docker.from_env()
100
101         if not PrhLibrary.is_in_status(client, name, "running"):
102             print ("starting container", name)
103             container = client.containers.get(name)
104             container.start()
105             PrhLibrary.wait_for_status(client, name, "running")
106
107         PrhLibrary.print_status(client)
108
109     @staticmethod
110     def ensure_container_is_exited(name):
111         client = docker.from_env()
112
113         if not PrhLibrary.is_in_status(client, name, "exited"):
114             print ("stopping container", name)
115             container = client.containers.get(name)
116             container.stop()
117             PrhLibrary.wait_for_status(client, name, "exited")
118
119         PrhLibrary.print_status(client)
120
121     @staticmethod
122     def print_status(client):
123         print("containers status")
124         for c in client.containers.list(all=True):
125             print(c.name, "   ", c.status)
126
127     @staticmethod
128     def wait_for_status(client, name, status):
129         while not PrhLibrary.is_in_status(client, name, status):
130             print ("waiting for container: ", name, "to be in status: ", status)
131             time.sleep(3)
132
133     @staticmethod
134     def is_in_status(client, name, status):
135         return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
136
137     @staticmethod
138     def create_pnf_name(json_file):
139         json_to_python = json.loads(json_file)
140         correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
141         return correlation_id
142
143     @staticmethod
144     def __get_additional_fields_as_key_value_pairs(content):
145         return content.get("event").get("pnfRegistrationFields").get(
146             "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else {}
147
148     @staticmethod
149     def __extract_value_from_pnfRegistrationFields(content, key):
150         return content["event"]["pnfRegistrationFields"][key] if key in content["event"]["pnfRegistrationFields"] else ''
151
152     @staticmethod
153     def __extract_correlation_id_value(content):
154         return content["event"]["commonEventHeader"]["sourceName"] if "sourceName" in content["event"]["commonEventHeader"] else ''
155
156     @staticmethod
157     def __extract_nf_role(content):
158         return content["event"]["commonEventHeader"]["nfNamingCode"] if "nfNamingCode" in content["event"]["commonEventHeader"] else ''
159
160     @staticmethod
161     def __same_json_in_log(decoded_message, line, pattern):
162         extracted_json = PrhLibrary.__extract_json(line, pattern)
163         if extracted_json is not None:
164             print("Found json: " + extracted_json)
165             try:
166                 if json.loads(extracted_json) == decoded_message:
167                     return True
168             except json.JSONDecodeError:
169                 print("Could not decode")
170         return False
171
172     @staticmethod
173     def __extract_json(line, pattern):
174         full_message = PrhLibrary.__extract_full_message_from_line(line)
175         if full_message is not None:
176             match = pattern.match(full_message)
177             if match:
178                 return match.group(1).replace("\\n", "\n").replace("\\t", "\t")
179         return None
180
181     @staticmethod
182     def __extract_full_message_from_line(line):
183         split = line.split("|")
184         if len(split) > 3:
185             return split[3]
186         return None
187
188     @staticmethod
189     def __get_docker_container(container_id):
190         docker_client = docker.from_env()
191         return docker_client.containers.get(container_id)