dc589369645bcd48a336c70708b0f00ba7e0c143
[integration/csit.git] / tests / dcaegen2 / prh-testcases / resources / PrhLibrary.py
1 import json
2
3 import docker
4 import time
5
6
7 class PrhLibrary(object):
8
9     def __init__(self):
10         pass
11
12     @staticmethod
13     def check_for_log(search_for):
14         client = docker.from_env()
15         container = client.containers.get('prh')
16         for line in container.logs(stream=True):
17             if search_for in line.strip():
18                 return True
19         else:
20             return False
21
22     @staticmethod
23     def create_pnf_ready_notification(json_file):
24         json_to_python = json.loads(json_file)
25         ipv4 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV4IpAddress")
26         ipv6 = json_to_python.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in json_to_python["event"]["pnfRegistrationFields"] else ""
27         correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
28         str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '"}'
29         python_to_json = json.dumps(str_json)
30         return python_to_json.replace("\\", "")[1:-1]
31
32     @staticmethod
33     def create_pnf_name(json_file):
34         json_to_python = json.loads(json_file)
35         correlation_id = json_to_python.get("sourceName")
36         return correlation_id
37
38     @staticmethod
39     def ensure_container_is_running(name):
40         client = docker.from_env()
41
42         if not PrhLibrary.is_in_status(client, name, "running"):
43             print ("starting container", name)
44             container = client.containers.get(name)
45             container.start()
46             PrhLibrary.wait_for_status(client, name, "running")
47
48         PrhLibrary.print_status(client)
49
50     @staticmethod
51     def ensure_container_is_exited(name):
52         client = docker.from_env()
53
54         if not PrhLibrary.is_in_status(client, name, "exited"):
55             print ("stopping container", name)
56             container = client.containers.get(name)
57             container.stop()
58             PrhLibrary.wait_for_status(client, name, "exited")
59
60         PrhLibrary.print_status(client)
61
62     @staticmethod
63     def print_status(client):
64         print("containers status")
65         for c in client.containers.list(all=True):
66             print(c.name, "   ", c.status)
67
68     @staticmethod
69     def wait_for_status(client, name, status):
70         while not PrhLibrary.is_in_status(client, name, status):
71             print ("waiting for container: ", name, "to be in status: ", status)
72             time.sleep(3)
73
74     @staticmethod
75     def is_in_status(client, name, status):
76         return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
77
78
79     def create_invalid_notification(self, json_file):
80         return self.create_pnf_ready_notification(json_file).replace("\":", "\": ")\
81             .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam", "oamV6IpAddress")\
82             .replace("}", "\\n}")