def check_for_log(search_for):
client = docker.from_env()
container = client.containers.get('prh')
+ print ("Check for log searches for pattern: ", search_for )
for line in container.logs(stream=True):
+ print ("Check for log analysis line: ", line )
if search_for in line.strip():
return True
else:
return False
@staticmethod
- def create_pnf_ready_notification(json_file):
+ def create_invalid_notification(json_file):
json_to_python = json.loads(json_file)
- ipv4 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV4IpAddress")
- ipv6 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in json_to_python["event"]["pnfRegistrationFields"] else ""
- serial_number = json_to_python.get("event").get("pnfRegistrationFields").get("serial-number") if "serial-number" in json_to_python["event"]["pnfRegistrationFields"] else ""
- equip_vendor = json_to_python.get("event").get("pnfRegistrationFields").get("equip-vendor") if "equip-vendor" in json_to_python["event"]["pnfRegistrationFields"] else ""
- equip_model = json_to_python.get("event").get("pnfRegistrationFields").get("equip-model") if "equip-model" in json_to_python["event"]["pnfRegistrationFields"] else ""
- equip_type = json_to_python.get("event").get("pnfRegistrationFields").get("equip-type") if "equip-type" in json_to_python["event"]["pnfRegistrationFields"] else ""
- nf_role = json_to_python.get("event").get("pnfRegistrationFields").get("nf-role") if "nf-role" in json_to_python["event"]["pnfRegistrationFields"] else ""
- sw_version = json_to_python.get("event").get("pnfRegistrationFields").get("sw-version") if "sw-version" in json_to_python["event"]["pnfRegistrationFields"] else ""
- correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
- str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + equip_vendor + '","equip-model":"' + equip_model + '","equip-type":"' + equip_type + '","nf-role":"' + nf_role + '","sw-version":"' + sw_version + '"}'
- python_to_json = json.dumps(str_json)
- return python_to_json.replace("\\", "")[1:-1]
+ correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
+ ipv4 = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "oamV4IpAddress", "oamV4IpAddress")
+ ipv6 = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "oamV6IpAddress", "oamV6IpAddress")
+ serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "serialNumber", "serialNumber")
+ vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "vendorName", "vendorName")
+ model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "modelNumber", "modelNumber")
+ unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "unitType", "unitType")
+
+ additional_fields = PrhLibrary.extract_additional_fields(json_to_python)
+
+ str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields
+ return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}'
+
+ @staticmethod
+ def create_pnf_ready_notification_as_pnf_ready(json_file):
+ json_to_python = json.loads(json_file)
+ correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
+
+ additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python)
+
+ str_json = '{' + correlation_id + additional_fields
+
+ return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1]
+
+ @staticmethod
+ def extract_additional_fields_value(content):
+ fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
+ if len(fields) == 0:
+ return ""
+ return PrhLibrary.build_additional_fields_json(fields)
+
+ @staticmethod
+ def extract_additional_fields(content):
+ fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
+ if fields == []:
+ return '"additionalFields":null'
+ return PrhLibrary.build_additional_fields_json(fields)
+
+ @staticmethod
+ def get_additional_fields_as_key_value_pairs(content):
+ return content.get("event").get("pnfRegistrationFields").get(
+ "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else []
+
+ @staticmethod
+ def build_additional_fields_json(fields):
+ res = '"additionalFields":{'
+ for f in fields:
+ res += '"' + f + '":"' + fields.get(f) + '",'
+ return res.rstrip(',') + '},'
+
+ @staticmethod
+ def extract_value_from_pnfRegistrationFields(content, name, key):
+ return '"' + name + '":"' + (content.get("event").get("pnfRegistrationFields").get(key) + '",' if key in content["event"]["pnfRegistrationFields"] else '",')
+
+ @staticmethod
+ def extract_correlation_id_value(content, name):
+ return '"' + name + '":"' + (content.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in content["event"]["commonEventHeader"] else '",')
@staticmethod
def create_pnf_name(json_file):
json_to_python = json.loads(json_file)
- correlation_id = json_to_python.get("sourceName")
+ correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
return correlation_id
@staticmethod
@staticmethod
def is_in_status(client, name, status):
return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
-
-
- def create_invalid_notification(self, json_file):
- return self.create_pnf_ready_notification(json_file).replace("\":", "\": ")\
- .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam", "oamV6IpAddress")\
- .replace("}", "\\n}")