PRH BBS tests - part 2
[integration/csit.git] / tests / dcaegen2 / prh-testcases / resources / PrhLibrary.py
1 import json
2
3 import docker
4 import time
5
6
7 class PrhLibrary(object):
8
9     def __init__(self):
10         pass
11
12     @staticmethod
13     def find_log_entry(search_for):
14         print (type(search_for))
15         client = docker.from_env()
16         container = client.containers.get('prh')
17         print ("Check for log searches for pattern: ", search_for )
18         for line in container.logs(stream=True):
19             print ("Check for log analysis line: ", line )
20             if search_for in line.strip():
21                 return True
22         else:
23             return False
24
25     @staticmethod
26     def create_invalid_notification(json_file):
27         json_to_python = json.loads(json_file)
28         correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
29         ipv4 = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "oamV4IpAddress", "oamV4IpAddress")
30         ipv6 = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "oamV6IpAddress", "oamV6IpAddress")
31         serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "serialNumber", "serialNumber")
32         vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "vendorName", "vendorName")
33         model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "modelNumber", "modelNumber")
34         unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(json_to_python, "unitType", "unitType")
35
36         additional_fields = PrhLibrary.extract_additional_fields(json_to_python)
37
38         str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields
39         return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}'
40
41     @staticmethod
42     def create_pnf_ready_notification_as_pnf_ready(json_file):
43         json_to_python = json.loads(json_file)
44         correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
45
46         additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python)
47
48         str_json = '{' + correlation_id + additional_fields
49
50         return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1]
51
52     @staticmethod
53     def extract_additional_fields_value(content):
54         fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
55         if len(fields) == 0:
56             return ""
57         return PrhLibrary.build_additional_fields_json(fields)
58
59     @staticmethod
60     def extract_additional_fields(content):
61         fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
62         if fields == []:
63             return '"additionalFields":null'
64         return PrhLibrary.build_additional_fields_json(fields)
65
66     @staticmethod
67     def get_additional_fields_as_key_value_pairs(content):
68         return content.get("event").get("pnfRegistrationFields").get(
69             "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else []
70
71     @staticmethod
72     def build_additional_fields_json(fields):
73         res = '"additionalFields":{'
74         for f in fields:
75             res += '"' + f + '":"' + fields.get(f) + '",'
76         return res.rstrip(',') + '},'
77
78     @staticmethod
79     def extract_value_from_pnfRegistrationFields(content, name, key):
80         return '"' + name + '":"' + (content.get("event").get("pnfRegistrationFields").get(key) + '",' if key in content["event"]["pnfRegistrationFields"] else '",')
81
82     @staticmethod
83     def extract_correlation_id_value(content, name):
84         return '"' + name + '":"' + (content.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in content["event"]["commonEventHeader"] else '",')
85
86     @staticmethod
87     def create_pnf_name(json_file):
88         json_to_python = json.loads(json_file)
89         correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
90         return correlation_id
91
92     @staticmethod
93     def ensure_container_is_running(name):
94         client = docker.from_env()
95
96         if not PrhLibrary.is_in_status(client, name, "running"):
97             print ("starting container", name)
98             container = client.containers.get(name)
99             container.start()
100             PrhLibrary.wait_for_status(client, name, "running")
101
102         PrhLibrary.print_status(client)
103
104     @staticmethod
105     def ensure_container_is_exited(name):
106         client = docker.from_env()
107
108         if not PrhLibrary.is_in_status(client, name, "exited"):
109             print ("stopping container", name)
110             container = client.containers.get(name)
111             container.stop()
112             PrhLibrary.wait_for_status(client, name, "exited")
113
114         PrhLibrary.print_status(client)
115
116     @staticmethod
117     def print_status(client):
118         print("containers status")
119         for c in client.containers.list(all=True):
120             print(c.name, "   ", c.status)
121
122     @staticmethod
123     def wait_for_status(client, name, status):
124         while not PrhLibrary.is_in_status(client, name, status):
125             print ("waiting for container: ", name, "to be in status: ", status)
126             time.sleep(3)
127
128     @staticmethod
129     def is_in_status(client, name, status):
130         return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1