7 class PrhLibrary(object):
13 def find_log_entry(search_for):
14 print (type(search_for))
15 client = docker.from_env()
16 container = client.containers.get('prh')
17 print ("Check for log searches for pattern: ", search_for )
18 for line in container.logs(stream=True):
19 print ("Check for log analysis line: ", line )
20 if search_for in line.strip():
26 def create_invalid_notification(json_file):
27 event = json.loads(json_file)[0]
28 correlation_id = PrhLibrary.extract_correlation_id_value(event, "correlationId")
29 ipv4 = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "oamV4IpAddress", "oamV4IpAddress")
30 ipv6 = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "oamV6IpAddress", "oamV6IpAddress")
31 serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "serialNumber", "serialNumber")
32 vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "vendorName", "vendorName")
33 model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "modelNumber", "modelNumber")
34 unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "unitType", "unitType")
36 additional_fields = PrhLibrary.extract_additional_fields(event)
38 str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields
39 return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}'
42 def create_pnf_ready_notification_as_pnf_ready(json_file):
43 json_to_python = json.loads(json_file)
44 correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
46 additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python)
48 str_json = '{' + correlation_id + additional_fields
50 return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1]
53 def extract_additional_fields_value(content):
54 fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
57 return PrhLibrary.build_additional_fields_json(fields)
60 def extract_additional_fields(content):
61 fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
63 return '"additionalFields":null'
64 return PrhLibrary.build_additional_fields_json(fields)
67 def get_additional_fields_as_key_value_pairs(content):
68 return content.get("event").get("pnfRegistrationFields").get(
69 "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else []
72 def build_additional_fields_json(fields):
73 res = '"additionalFields":{'
75 res += '"' + f + '":"' + fields.get(f) + '",'
76 return res.rstrip(',') + '},'
79 def extract_value_from_pnfRegistrationFields(content, name, key):
80 return '"' + name + '":"' + (content.get("event").get("pnfRegistrationFields").get(key) + '",' if key in content["event"]["pnfRegistrationFields"] else '",')
83 def extract_correlation_id_value(content, name):
84 return '"' + name + '":"' + (content.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in content["event"]["commonEventHeader"] else '",')
87 def create_pnf_name(json_file):
88 json_to_python = json.loads(json_file)
89 correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
93 def ensure_container_is_running(name):
94 client = docker.from_env()
96 if not PrhLibrary.is_in_status(client, name, "running"):
97 print ("starting container", name)
98 container = client.containers.get(name)
100 PrhLibrary.wait_for_status(client, name, "running")
102 PrhLibrary.print_status(client)
105 def ensure_container_is_exited(name):
106 client = docker.from_env()
108 if not PrhLibrary.is_in_status(client, name, "exited"):
109 print ("stopping container", name)
110 container = client.containers.get(name)
112 PrhLibrary.wait_for_status(client, name, "exited")
114 PrhLibrary.print_status(client)
117 def print_status(client):
118 print("containers status")
119 for c in client.containers.list(all=True):
120 print(c.name, " ", c.status)
123 def wait_for_status(client, name, status):
124 while not PrhLibrary.is_in_status(client, name, status):
125 print ("waiting for container: ", name, "to be in status: ", status)
129 def is_in_status(client, name, status):
130 return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1