7 class PrhLibrary(object):
13 def check_for_log(search_for):
14 client = docker.from_env()
15 container = client.containers.get('prh')
16 #print ("Check for log searches for pattern: ", search_for )
17 for line in container.logs(stream=True):
18 #print ("Check for log analysis line: ", line )
19 if search_for in line.strip():
25 def create_pnf_ready_notification_from_ves(json_file):
26 json_to_python = json.loads(json_file)
27 ipv4 = PrhLibrary.extract_ip_v4(json_to_python)
28 ipv6 = PrhLibrary.extract_ip_v6(json_to_python)
29 correlation_id = PrhLibrary.extract_correlation_id(json_to_python)
30 serial_number = PrhLibrary.extract_serial_number(json_to_python)
31 vendor_name = PrhLibrary.extract_vendor_name(json_to_python)
32 model_number = PrhLibrary.extract_model_number(json_to_python)
33 unit_type = PrhLibrary.extract_unit_type(json_to_python)
35 str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serialNumber":"' + serial_number + '","vendorName":"' + vendor_name + '","modelNumber":"' + model_number + '","unitType":"' + unit_type + '"}'
36 python_to_json = json.dumps(str_json)
37 return python_to_json.replace("\\", "")[1:-1]
40 def create_pnf_ready_notification_as_pnf_ready(json_file):
41 json_to_python = json.loads(json_file)
42 ipv4 = PrhLibrary.extract_ip_v4(json_to_python)
43 ipv6 = PrhLibrary.extract_ip_v6(json_to_python)
44 correlation_id = PrhLibrary.extract_correlation_id(json_to_python)
45 serial_number = PrhLibrary.extract_serial_number(json_to_python)
46 vendor_name = PrhLibrary.extract_vendor_name(json_to_python)
47 model_number = PrhLibrary.extract_model_number(json_to_python)
48 unit_type = PrhLibrary.extract_unit_type(json_to_python)
50 nf_role = json_to_python.get("event").get("commonEventHeader").get("nfNamingCode") if "nfNamingCode" in json_to_python["event"]["commonEventHeader"] else ""
52 str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + vendor_name + '","equip-model":"' + model_number + '","equip-type":"' + unit_type + '","nf-role":"' + nf_role + '","sw-version":""}'
53 python_to_json = json.dumps(str_json)
54 return python_to_json.replace("\\", "")[1:-1]
57 def extract_ip_v4(content):
58 return content.get("event").get("pnfRegistrationFields").get("oamV4IpAddress") if "oamV4IpAddress" in content["event"]["pnfRegistrationFields"] else ""
61 def extract_ip_v6(content):
62 return content.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in content["event"]["pnfRegistrationFields"] else ""
65 def extract_correlation_id(content):
66 return content.get("event").get("commonEventHeader").get("sourceName") if "sourceName" in content["event"]["commonEventHeader"] else ""
69 def extract_serial_number(content):
70 return content.get("event").get("pnfRegistrationFields").get("serialNumber") if "serialNumber" in content["event"]["pnfRegistrationFields"] else ""
73 def extract_vendor_name(content):
74 return content.get("event").get("pnfRegistrationFields").get("vendorName") if "vendorName" in content["event"]["pnfRegistrationFields"] else ""
77 def extract_model_number(content):
78 return content.get("event").get("pnfRegistrationFields").get("modelNumber") if "modelNumber" in content["event"]["pnfRegistrationFields"] else ""
81 def extract_unit_type(content):
82 return content.get("event").get("pnfRegistrationFields").get("unitType") if "unitType" in content["event"]["pnfRegistrationFields"] else ""
85 def create_pnf_name(json_file):
86 json_to_python = json.loads(json_file)
87 correlation_id = json_to_python.get("sourceName")
91 def ensure_container_is_running(name):
92 client = docker.from_env()
94 if not PrhLibrary.is_in_status(client, name, "running"):
95 print ("starting container", name)
96 container = client.containers.get(name)
98 PrhLibrary.wait_for_status(client, name, "running")
100 PrhLibrary.print_status(client)
103 def ensure_container_is_exited(name):
104 client = docker.from_env()
106 if not PrhLibrary.is_in_status(client, name, "exited"):
107 print ("stopping container", name)
108 container = client.containers.get(name)
110 PrhLibrary.wait_for_status(client, name, "exited")
112 PrhLibrary.print_status(client)
115 def print_status(client):
116 print("containers status")
117 for c in client.containers.list(all=True):
118 print(c.name, " ", c.status)
121 def wait_for_status(client, name, status):
122 while not PrhLibrary.is_in_status(client, name, status):
123 print ("waiting for container: ", name, "to be in status: ", status)
127 def is_in_status(client, name, status):
128 return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
131 def create_invalid_notification(self, json_file):
132 invalidate_input = self.create_pnf_ready_notification_from_ves(json_file).replace("\":", "\": ") \
133 .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam","oamV6IpAddress").replace("}",'')
134 invalidate_input__and_add_additional_fields = invalidate_input + ',"nfNamingCode": ""' + "," + '"softwareVersion": ""' +"\\n"
135 return invalidate_input__and_add_additional_fields