+ return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
+
+ @staticmethod
+ def create_pnf_name(json_file):
+ json_to_python = json.loads(json_file)
+ correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
+ return correlation_id
+
+ @staticmethod
+ def __get_additional_fields_as_key_value_pairs(content):
+ return content.get("event").get("pnfRegistrationFields").get(
+ "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else {}
+
+ @staticmethod
+ def __extract_value_from_pnfRegistrationFields(content, key):
+ return content["event"]["pnfRegistrationFields"][key] if key in content["event"]["pnfRegistrationFields"] else ''
+
+ @staticmethod
+ def __extract_correlation_id_value(content):
+ return content["event"]["commonEventHeader"]["sourceName"] if "sourceName" in content["event"]["commonEventHeader"] else ''
+
+ @staticmethod
+ def __extract_nf_role(content):
+ return content["event"]["commonEventHeader"]["nfNamingCode"] if "nfNamingCode" in content["event"]["commonEventHeader"] else ''
+
+ @staticmethod
+ def __same_json_in_log(decoded_message, line, pattern):
+ extracted_json = PrhLibrary.__extract_json(line, pattern)
+ if extracted_json is not None:
+ print("Found json: " + extracted_json)
+ try:
+ if json.loads(extracted_json) == decoded_message:
+ return True
+ except json.JSONDecodeError:
+ print("Could not decode")
+ return False
+
+ @staticmethod
+ def __extract_json(line, pattern):
+ full_message = PrhLibrary.__extract_full_message_from_line(line)
+ if full_message is not None:
+ match = pattern.match(full_message)
+ if match:
+ return match.group(1).replace("\\n", "\n").replace("\\t", "\t")
+ return None
+
+ @staticmethod
+ def __extract_full_message_from_line(line):
+ split = line.split("|")
+ if len(split) > 3:
+ return split[3]
+ return None