import json import re import docker import time class PrhLibrary(object): def __init__(self): pass @staticmethod def find_one_of_log_entryies(searched_entries): print(type(searched_entries)) client = docker.from_env() container = client.containers.get('prh') print("Check for log searches for pattern: ", searched_entries) for line in container.logs(stream=True): print("Check for log analysis line: ", line ) for searched_entry in searched_entries: if searched_entry in line.strip(): return True else: return False @staticmethod def find_log_json(prefix, json_message): print("Looking for:") print("Prefix: " + str(prefix)) print("Json: " + str(json_message)) try: decoded_message = json.loads(json_message) except json.JSONDecodeError: print("Could not decode given message") return False pattern = re.compile(prefix + "(.*)$") client = docker.from_env() container = client.containers.get('prh') for line in container.logs(stream=True): print("Check for log analysis line: ", line ) if PrhLibrary.__same_json_in_log(decoded_message, line, pattern): return True else: return False @staticmethod def create_invalid_notification(json_file): output = {} input = json.loads(json_file) output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input) output["oamV4IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV4IpAddress") output["oamV6IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV6IpAddress") output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber") output["vendorName"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName") output["modelNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber") output["unitType"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType") output['nfNamingCode'] = '' output['softwareVersion'] = '' output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input) return json.dumps(output) @staticmethod def create_pnf_ready_notification_as_pnf_ready(json_file): output = {} input = json.loads(json_file)[0] output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input) output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber") output["equip-vendor"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName") output["equip-model"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber") output["equip-type"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType") output["nf-role"] = PrhLibrary.__extract_nf_role(input) output["sw-version"] = "" output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input) return json.dumps(output) @staticmethod def ensure_container_is_running(name): client = docker.from_env() if not PrhLibrary.is_in_status(client, name, "running"): print ("starting container", name) container = client.containers.get(name) container.start() PrhLibrary.wait_for_status(client, name, "running") PrhLibrary.print_status(client) @staticmethod def ensure_container_is_exited(name): client = docker.from_env() if not PrhLibrary.is_in_status(client, name, "exited"): print ("stopping container", name) container = client.containers.get(name) container.stop() PrhLibrary.wait_for_status(client, name, "exited") PrhLibrary.print_status(client) @staticmethod def print_status(client): print("containers status") for c in client.containers.list(all=True): print(c.name, " ", c.status) @staticmethod def wait_for_status(client, name, status): while not PrhLibrary.is_in_status(client, name, status): print ("waiting for container: ", name, "to be in status: ", status) time.sleep(3) @staticmethod def is_in_status(client, name, status): return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1 @staticmethod def create_pnf_name(json_file): json_to_python = json.loads(json_file) correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",' return correlation_id @staticmethod def __get_additional_fields_as_key_value_pairs(content): return content.get("event").get("pnfRegistrationFields").get( "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else {} @staticmethod def __extract_value_from_pnfRegistrationFields(content, key): return content["event"]["pnfRegistrationFields"][key] if key in content["event"]["pnfRegistrationFields"] else '' @staticmethod def __extract_correlation_id_value(content): return content["event"]["commonEventHeader"]["sourceName"] if "sourceName" in content["event"]["commonEventHeader"] else '' @staticmethod def __extract_nf_role(content): return content["event"]["commonEventHeader"]["nfNamingCode"] if "nfNamingCode" in content["event"]["commonEventHeader"] else '' @staticmethod def __same_json_in_log(decoded_message, line, pattern): extracted_json = PrhLibrary.__extract_json(line, pattern) if extracted_json is not None: print("Found json: " + extracted_json) try: if json.loads(extracted_json) == decoded_message: return True except json.JSONDecodeError: print("Could not decode") return False @staticmethod def __extract_json(line, pattern): full_message = PrhLibrary.__extract_full_message_from_line(line) if full_message is not None: match = pattern.match(full_message) if match: return match.group(1).replace("\\n", "\n").replace("\\t", "\t") return None @staticmethod def __extract_full_message_from_line(line): split = line.split("|") if len(split) > 3: return split[3] return None