Update expected log entries in PRH in case AAI has missing entry or is not reponding
[integration/csit.git] / tests / dcaegen2 / prh-testcases / resources / PrhLibrary.py
index d413be5..b994261 100644 (file)
@@ -1,5 +1,5 @@
 import json
-
+import re
 import docker
 import time
 
@@ -10,36 +10,74 @@ class PrhLibrary(object):
         pass
 
     @staticmethod
-    def check_for_log(search_for):
+    def find_one_of_log_entryies(searched_entries):
+        print(type(searched_entries))
+        client = docker.from_env()
+        container = client.containers.get('prh')
+        print("Check for log searches for pattern: ", searched_entries)
+        for line in container.logs(stream=True):
+            print("Check for log analysis line: ", line )
+            for searched_entry in searched_entries:
+                if searched_entry in line.strip():
+                    return True
+        else:
+            return False
+
+    @staticmethod
+    def find_log_json(prefix, json_message):
+        print("Looking for:")
+        print("Prefix: " + str(prefix))
+        print("Json: " + str(json_message))
+        try:
+            decoded_message = json.loads(json_message)
+        except json.JSONDecodeError:
+            print("Could not decode given message")
+            return False
+        pattern = re.compile(prefix + "(.*)$")
         client = docker.from_env()
         container = client.containers.get('prh')
         for line in container.logs(stream=True):
-            if search_for in line.strip():
+            print("Check for log analysis line: ", line )
+            if PrhLibrary.__same_json_in_log(decoded_message, line, pattern):
                 return True
         else:
             return False
 
     @staticmethod
-    def create_pnf_ready_notification(json_file):
-        json_to_python = json.loads(json_file)
-        ipv4 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV4IpAddress")
-        ipv6 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in json_to_python["event"]["pnfRegistrationFields"] else ""
-        serial_number = json_to_python.get("event").get("pnfRegistrationFields").get("serial-number") if "serial-number" in json_to_python["event"]["pnfRegistrationFields"] else ""
-        equip_vendor = json_to_python.get("event").get("pnfRegistrationFields").get("equip-vendor") if "equip-vendor" in json_to_python["event"]["pnfRegistrationFields"] else ""
-        equip_model = json_to_python.get("event").get("pnfRegistrationFields").get("equip-model") if "equip-model" in json_to_python["event"]["pnfRegistrationFields"] else ""
-        equip_type = json_to_python.get("event").get("pnfRegistrationFields").get("equip-type") if "equip-type" in json_to_python["event"]["pnfRegistrationFields"] else ""
-        nf_role = json_to_python.get("event").get("pnfRegistrationFields").get("nf-role") if "nf-role" in json_to_python["event"]["pnfRegistrationFields"] else ""
-        sw_version = json_to_python.get("event").get("pnfRegistrationFields").get("sw-version") if "sw-version" in json_to_python["event"]["pnfRegistrationFields"] else ""
-        correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
-        str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + equip_vendor + '","equip-model":"' + equip_model + '","equip-type":"' + equip_type + '","nf-role":"' + nf_role + '","sw-version":"' + sw_version + '"}'
-        python_to_json = json.dumps(str_json)
-        return python_to_json.replace("\\", "")[1:-1]
+    def create_invalid_notification(json_file):
+        output = {}
+        input = json.loads(json_file)[0]
+
+        output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
+        output["oamV4IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV4IpAddress")
+        output["oamV6IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV6IpAddress")
+        output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
+        output["vendorName"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
+        output["modelNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
+        output["unitType"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
+        output['nfNamingCode'] = ''
+        output['softwareVersion'] = ''
+
+        output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
+
+        return json.dumps(output)
 
     @staticmethod
-    def create_pnf_name(json_file):
-        json_to_python = json.loads(json_file)
-        correlation_id = json_to_python.get("sourceName")
-        return correlation_id
+    def create_pnf_ready_notification_as_pnf_ready(json_file):
+        output = {}
+        input = json.loads(json_file)[0]
+
+        output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
+        output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
+        output["equip-vendor"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
+        output["equip-model"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
+        output["equip-type"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
+        output["nf-role"] = PrhLibrary.__extract_nf_role(input)
+        output["sw-version"] = ""
+
+        output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
+
+        return json.dumps(output)
 
     @staticmethod
     def ensure_container_is_running(name):
@@ -81,8 +119,53 @@ class PrhLibrary(object):
     def is_in_status(client, name, status):
         return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
 
+    @staticmethod
+    def create_pnf_name(json_file):
+        json_to_python = json.loads(json_file)
+        correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
+        return correlation_id
+
+    @staticmethod
+    def __get_additional_fields_as_key_value_pairs(content):
+        return content.get("event").get("pnfRegistrationFields").get(
+            "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else {}
+
+    @staticmethod
+    def __extract_value_from_pnfRegistrationFields(content, key):
+        return content["event"]["pnfRegistrationFields"][key] if key in content["event"]["pnfRegistrationFields"] else ''
+
+    @staticmethod
+    def __extract_correlation_id_value(content):
+        return content["event"]["commonEventHeader"]["sourceName"] if "sourceName" in content["event"]["commonEventHeader"] else ''
 
-    def create_invalid_notification(self, json_file):
-        return self.create_pnf_ready_notification(json_file).replace("\":", "\": ")\
-            .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam", "oamV6IpAddress")\
-            .replace("}", "\\n}")
+    @staticmethod
+    def __extract_nf_role(content):
+        return content["event"]["commonEventHeader"]["nfNamingCode"] if "nfNamingCode" in content["event"]["commonEventHeader"] else ''
+
+    @staticmethod
+    def __same_json_in_log(decoded_message, line, pattern):
+        extracted_json = PrhLibrary.__extract_json(line, pattern)
+        if extracted_json is not None:
+            print("Found json: " + extracted_json)
+            try:
+                if json.loads(extracted_json) == decoded_message:
+                    return True
+            except json.JSONDecodeError:
+                print("Could not decode")
+        return False
+
+    @staticmethod
+    def __extract_json(line, pattern):
+        full_message = PrhLibrary.__extract_full_message_from_line(line)
+        if full_message is not None:
+            match = pattern.match(full_message)
+            if match:
+                return match.group(1).replace("\\n", "\n").replace("\\t", "\t")
+        return None
+
+    @staticmethod
+    def __extract_full_message_from_line(line):
+        split = line.split("|")
+        if len(split) > 3:
+            return split[3]
+        return None