CSIT tests update for Synchronous VES collector
[integration/csit.git] / tests / dcaegen2 / testcases / resources / robot_library / DmaapLibrary.py
1 from Queue import Queue
2
3 import robot.api.logger as logger
4 import threading
5 import time
6
7 import DcaeVariables
8 from robot_library.dmaap_simulator import DMaaPServer
9 from robot_library.dmaap_simulator.DMaaPQueue import DMaaPQueue
10
11 class DmaapLibrary(object):
12
13     dmaap_queue = None
14     dmaap_server = None
15     server_thread = None
16
17     def __init__(self):
18         pass
19
20     @staticmethod
21     def setup_dmaap_server(port_num=3904):
22         try:
23             DmaapLibrary.start_dmaap_server_on_new_thread(port_num)
24             return "true"
25         except Exception as e:
26             print (str(e))
27             return "false"
28
29     @staticmethod
30     def start_dmaap_server_on_new_thread(port_num):
31         DmaapLibrary.dmaap_queue = DMaaPQueue(Queue())
32         DmaapLibrary.dmaap_server = DMaaPServer.create_dmaap_server(DmaapLibrary.dmaap_queue, port=port_num)
33         DmaapLibrary.server_thread = threading.Thread(name='DMAAP_HTTPServer',
34                                                       target=DmaapLibrary.dmaap_server.serve_forever)
35         DmaapLibrary.server_thread.start()
36         logger.console("DMaaP Mockup Sever started")
37         DcaeVariables.IsRobotRun = True
38         time.sleep(2)
39
40     @staticmethod
41     def shutdown_dmaap():
42         if DmaapLibrary.dmaap_server is not None:
43             DmaapLibrary.dmaap_server.shutdown()
44             logger.console("DMaaP Server shut down")
45             time.sleep(3)
46             return "true"
47         else:
48             return "false"
49
50     @staticmethod
51     def cleanup_ves_events():
52         DmaapLibrary.dmaap_server.reset_dmaap_succesfull_code()
53         if DmaapLibrary.server_thread is not None:
54             DmaapLibrary.dmaap_queue.clean_up_event()
55             logger.console("DMaaP event queue is cleaned up")
56             return "true"
57         logger.console("DMaaP server not started yet")
58         return "false"
59
60     @staticmethod
61     def dmaap_message_receive_on_topic(evtobj, topic):
62
63         evt_str = DmaapLibrary.dmaap_queue.deque_event()
64         while evt_str != None:
65             if evtobj in evt_str and topic in evt_str:
66                 logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
67                 logger.info("On Expected Topic:\n" + topic)
68                 return 'true'
69             evt_str = DmaapLibrary.dmaap_queue.deque_event()
70         return 'false'
71
72     @staticmethod
73     def dmaap_message_receive(evtobj):
74         evt_str = DmaapLibrary.dmaap_queue.deque_event()
75         while evt_str != None:
76             if evtobj in evt_str:
77                 logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
78                 return 'true'
79             evt_str = DmaapLibrary.dmaap_queue.deque_event()
80         return 'false'
81
82     @staticmethod
83     def set_successfull_dmaap_code(code):
84         DmaapLibrary.dmaap_server.set_dmaap_successfull_code(int(code))