7 class PrhLibrary(object):
13 def check_for_log(search_for):
14 client = docker.from_env()
15 container = client.containers.get('prh')
16 for line in container.logs(stream=True):
17 if search_for in line.strip():
23 def create_pnf_ready_notification(json_file):
24 json_to_python = json.loads(json_file)
25 ipv4 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV4IpAddress")
26 ipv6 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in json_to_python["event"]["pnfRegistrationFields"] else ""
27 correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
28 str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '"}'
29 python_to_json = json.dumps(str_json)
30 return python_to_json.replace("\\", "")[1:-1]
33 def create_pnf_name(json_file):
34 json_to_python = json.loads(json_file)
35 correlation_id = json_to_python.get("sourceName")
39 def ensure_container_is_running(name):
40 client = docker.from_env()
42 if not PrhLibrary.is_in_status(client, name, "running"):
43 print ("starting container", name)
44 container = client.containers.get(name)
46 PrhLibrary.wait_for_status(client, name, "running")
48 PrhLibrary.print_status(client)
51 def ensure_container_is_exited(name):
52 client = docker.from_env()
54 if not PrhLibrary.is_in_status(client, name, "exited"):
55 print ("stopping container", name)
56 container = client.containers.get(name)
58 PrhLibrary.wait_for_status(client, name, "exited")
60 PrhLibrary.print_status(client)
63 def print_status(client):
64 print("containers status")
65 for c in client.containers.list(all=True):
66 print(c.name, " ", c.status)
69 def wait_for_status(client, name, status):
70 while not PrhLibrary.is_in_status(client, name, status):
71 print ("waiting for container: ", name, "to be in status: ", status)
75 def is_in_status(client, name, status):
76 return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
79 def create_invalid_notification(self, json_file):
80 return self.create_pnf_ready_notification(json_file).replace("\":", "\": ")\
81 .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam", "oamV6IpAddress")\