pass
@staticmethod
- def check_for_log(search_for):
+ def find_log_entry(search_for):
+ print (type(search_for))
client = docker.from_env()
container = client.containers.get('prh')
print ("Check for log searches for pattern: ", search_for )
vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "vendorName", "vendorName")
model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "modelNumber", "modelNumber")
unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "unitType", "unitType")
- additional_fields = PrhLibrary.extract_additional_fields(json_to_python, "additionalFields")
+
+ additional_fields = PrhLibrary.extract_additional_fields(json_to_python)
str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields
return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}'
def create_pnf_ready_notification_as_pnf_ready(json_file):
json_to_python = json.loads(json_file)
correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
- serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "serial-number", "serialNumber")
- vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "equip-vendor", "vendorName")
- model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "equip-model", "modelNumber")
- unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "equip-type", "unitType")
- additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python, "additionalFields")
- nf_role = json_to_python.get("event").get("commonEventHeader").get("nfNamingCode") if "nfNamingCode" in json_to_python["event"]["commonEventHeader"] else ""
+ additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python)
- str_json = '{' + correlation_id + serial_number + vendor_name + model_number + unit_type + '"nf-role":"' + nf_role + '","sw-version":"",' + additional_fields
+ str_json = '{' + correlation_id + additional_fields
return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1]
@staticmethod
- def extract_additional_fields(content, name):
- fields = content.get("event").get("pnfRegistrationFields").get(name) if name in content["event"]["pnfRegistrationFields"] else []
+ def extract_additional_fields_value(content):
+ fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
+ if len(fields) == 0:
+ return ""
+ return PrhLibrary.build_additional_fields_json(fields)
+
+ @staticmethod
+ def extract_additional_fields(content):
+ fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
if fields == []:
- return '"additionalFields":' + 'null'
- res = '"' + name + '":{'
- for f in fields:
- res += '"' + f + '"' + ':' + '"' + fields.get(f) + '",'
- return res.rstrip(',') + '},'
+ return '"additionalFields":null'
+ return PrhLibrary.build_additional_fields_json(fields)
@staticmethod
- def extract_additional_fields_value(content, name):
- fields = content.get("event").get("pnfRegistrationFields").get(name) if name in content["event"]["pnfRegistrationFields"] else []
- if fields == [] or len(fields) == 0:
- return ""
- res = '"' + name + '":{'
+ def get_additional_fields_as_key_value_pairs(content):
+ return content.get("event").get("pnfRegistrationFields").get(
+ "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else []
+
+ @staticmethod
+ def build_additional_fields_json(fields):
+ res = '"additionalFields":{'
for f in fields:
- res += '"' + f + '"' + ':' + '"' + fields.get(f) + '",'
+ res += '"' + f + '":"' + fields.get(f) + '",'
return res.rstrip(',') + '},'
@staticmethod
@staticmethod
def create_pnf_name(json_file):
json_to_python = json.loads(json_file)
- correlation_id = json_to_python.get("sourceName")
+ correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
return correlation_id
@staticmethod
@staticmethod
def is_in_status(client, name, status):
- return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
+ return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
\ No newline at end of file