7 class PrhLibrary(object):
13 def check_for_log(search_for):
14 client = docker.from_env()
15 container = client.containers.get('prh')
16 print ("Check for log searches for pattern: ", search_for )
17 for line in container.logs(stream=True):
18 print ("Check for log analysis line: ", line )
19 if search_for in line.strip():
25 def create_invalid_notification(json_file):
26 json_to_python = json.loads(json_file)
27 correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
28 ipv4 = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "oamV4IpAddress", "oamV4IpAddress")
29 ipv6 = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "oamV6IpAddress", "oamV6IpAddress")
30 serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "serialNumber", "serialNumber")
31 vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "vendorName", "vendorName")
32 model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "modelNumber", "modelNumber")
33 unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "unitType", "unitType")
34 additional_fields = PrhLibrary.extract_additional_fields(json_to_python, "additionalFields")
36 str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields
37 return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}'
40 def create_pnf_ready_notification_as_pnf_ready(json_file):
41 json_to_python = json.loads(json_file)
42 correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
43 serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "serial-number", "serialNumber")
44 vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "equip-vendor", "vendorName")
45 model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "equip-model", "modelNumber")
46 unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "equip-type", "unitType")
47 additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python, "additionalFields")
49 nf_role = json_to_python.get("event").get("commonEventHeader").get("nfNamingCode") if "nfNamingCode" in json_to_python["event"]["commonEventHeader"] else ""
51 str_json = '{' + correlation_id + serial_number + vendor_name + model_number + unit_type + '"nf-role":"' + nf_role + '","sw-version":"",' + additional_fields
53 return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1]
56 def extract_additional_fields(content, name):
57 fields = content.get("event").get("pnfRegistrationFields").get(name) if name in content["event"]["pnfRegistrationFields"] else []
59 return '"additionalFields":' + 'null'
60 res = '"' + name + '":{'
62 res += '"' + f + '"' + ':' + '"' + fields.get(f) + '",'
63 return res.rstrip(',') + '},'
66 def extract_additional_fields_value(content, name):
67 fields = content.get("event").get("pnfRegistrationFields").get(name) if name in content["event"]["pnfRegistrationFields"] else []
68 if fields == [] or len(fields) == 0:
70 res = '"' + name + '":{'
72 res += '"' + f + '"' + ':' + '"' + fields.get(f) + '",'
73 return res.rstrip(',') + '},'
76 def extract_value_from_pnfRegistrationFields(content, name, key):
77 return '"' + name + '":"' + (content.get("event").get("pnfRegistrationFields").get(key) + '",' if key in content["event"]["pnfRegistrationFields"] else '",')
80 def extract_correlation_id_value(content, name):
81 return '"' + name + '":"' + (content.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in content["event"]["commonEventHeader"] else '",')
84 def create_pnf_name(json_file):
85 json_to_python = json.loads(json_file)
86 correlation_id = json_to_python.get("sourceName")
90 def ensure_container_is_running(name):
91 client = docker.from_env()
93 if not PrhLibrary.is_in_status(client, name, "running"):
94 print ("starting container", name)
95 container = client.containers.get(name)
97 PrhLibrary.wait_for_status(client, name, "running")
99 PrhLibrary.print_status(client)
102 def ensure_container_is_exited(name):
103 client = docker.from_env()
105 if not PrhLibrary.is_in_status(client, name, "exited"):
106 print ("stopping container", name)
107 container = client.containers.get(name)
109 PrhLibrary.wait_for_status(client, name, "exited")
111 PrhLibrary.print_status(client)
114 def print_status(client):
115 print("containers status")
116 for c in client.containers.list(all=True):
117 print(c.name, " ", c.status)
120 def wait_for_status(client, name, status):
121 while not PrhLibrary.is_in_status(client, name, status):
122 print ("waiting for container: ", name, "to be in status: ", status)
126 def is_in_status(client, name, status):
127 return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1