8dbdc5a3d47577b170d746a030e40abc8b337fc2
[integration/csit.git] / tests / dcaegen2-services-bbs-event-processor / bbs-testcases / resources / BbsLibrary.py
1 import json
2
3 import docker
4 import time
5 from docker.utils.json_stream import json_stream
6 from collections import OrderedDict
7
8
9 class BbsLibrary(object):
10
11     def __init__(self):
12         pass
13
14     @staticmethod
15     def check_for_log(search_for):
16         client = docker.from_env()
17         container = client.containers.get('bbs')
18
19         alog = container.logs(stream=False, tail=1000)
20         try:
21             alog = alog.decode()
22         except AttributeError:
23             pass
24
25         found = alog.find(search_for)
26         if found != -1:
27             return True
28         else:
29             return False
30
31     @staticmethod
32     def create_pnf_name_from_auth(json_file):
33         json_to_python = json.loads(json_file)
34         correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
35         return correlation_id
36
37     @staticmethod
38     def get_invalid_auth_elements(json_file):
39         """
40         Get the correlationId, oldState, newState, stateInterface, macAddress, swVersion elements
41         from the invalid message and place the elements into a JSON object (string) as fields for comparision
42         """
43         json_to_python = json.loads(json_file)
44         correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
45         oldState = json_to_python.get("event").get("stateChangeFields").get("oldState")
46         newState = json_to_python.get("event").get("stateChangeFields").get("newState")
47         stateInterface = json_to_python.get("event").get("stateChangeFields").get("stateInterface")
48         macAddress = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("macAddress")
49         swVersion = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("swVersion")
50         if swVersion is None:
51             swVersion = ""
52         
53         inv_fields = OrderedDict()
54
55         #inv_fields = dict()
56         inv_fields['correlationId'] = correlation_id
57         inv_fields['oldState'] = oldState
58         inv_fields['newState'] = newState
59         inv_fields['stateInterface'] = stateInterface
60         inv_fields['macAddress'] = macAddress
61         inv_fields['swVersion'] = swVersion
62         
63         # Transform the dictionary to JSON string
64         json_str = json.dumps(inv_fields)
65         
66         # Need to remove spaces between elements
67         json_str = json_str.replace(', ', ',')
68         return json_str
69
70     @staticmethod
71     def get_invalid_update_elements(json_file):
72         """
73         Get the correlationId, attachment-point, remote-id, cvlan, svlan, elements
74         from the invalid message and place the elements into a JSON object (string) as fields for comparision
75         """
76         json_to_python = json.loads(json_file)
77         correlation_id = json_to_python.get("correlationId")
78         attachmentPoint = json_to_python.get("additionalFields").get("attachment-point")
79         remoteId = json_to_python.get("additionalFields").get("remote-id")
80         cvlan = json_to_python.get("additionalFields").get("cvlan")
81         svlan = json_to_python.get("additionalFields").get("svlan")
82         
83         inv_fields = OrderedDict()
84         #inv_fields = dict()
85         inv_fields['correlationId'] = correlation_id
86         inv_fields['attachment-point'] = attachmentPoint
87         inv_fields['remote-id'] = remoteId
88         inv_fields['cvlan'] = cvlan
89         inv_fields['svlan'] = svlan
90         
91         # Transform the dictionary to JSON string
92         json_str = json.dumps(inv_fields)
93         
94         # Need to remove spaces between elements
95         json_str = json_str.replace(', ', ',')
96         return json_str
97
98     @staticmethod
99     def compare_policy(dmaap_policy, json_policy):
100         resp = False
101         try:
102             python_policy = json.loads(json_policy).pop()
103         except:
104             python_policy = ""
105         
106         try:
107             python_dmaap_policy = json.loads(dmaap_policy)
108         except:
109             python_dmaap_policy = ""
110
111         try:
112             d_policy = python_dmaap_policy.get("policyName")
113         except:
114             d_policy = ""
115
116         try:
117             j_policy = python_policy.get("policyName")
118         except:
119             return "False"
120         
121         resp = "False"
122         if (d_policy == j_policy):
123             resp = "True"
124         return resp
125
126     @staticmethod
127     def create_pnf_name_from_update(json_file):
128         json_to_python = json.loads(json_file)
129         correlation_id = json_to_python.get("correlationId")
130         return correlation_id
131
132     @staticmethod
133     def ensure_container_is_running(name):
134         
135         client = docker.from_env()
136
137         if not BbsLibrary.is_in_status(client, name, "running"):
138             print ("starting container", name)
139             container = client.containers.get(name)
140             container.start()
141             BbsLibrary.wait_for_status(client, name, "running")
142
143         BbsLibrary.print_status(client)
144
145     @staticmethod
146     def ensure_container_is_exited(name):
147
148         client = docker.from_env()
149
150         if not BbsLibrary.is_in_status(client, name, "exited"):
151             print ("stopping container", name)
152             container = client.containers.get(name)
153             container.stop()
154             BbsLibrary.wait_for_status(client, name, "exited")
155
156         BbsLibrary.print_status(client)
157
158     @staticmethod
159     def print_status(client):
160         print("containers status")
161         for c in client.containers.list(all=True):
162             print(c.name, "   ", c.status)
163
164     @staticmethod
165     def wait_for_status(client, name, status):
166         while not BbsLibrary.is_in_status(client, name, status):
167             print ("waiting for container: ", name, "to be in status: ", status)
168             time.sleep(3)
169
170     @staticmethod
171     def is_in_status(client, name, status):
172         return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
173