Refactor DMaaP simulator and add tests.
[integration/csit.git] / tests / dcaegen2 / testcases / resources / robot_library / DmaapLibrary.py
1 from Queue import Queue
2
3 import robot.api.logger as logger
4 import threading
5 import time
6
7 import DcaeVariables
8 from robot_library.dmaap_simulator import DMaaPServer
9 from robot_library.dmaap_simulator.DMaaPQueue import DMaaPQueue
10
11 class DmaapLibrary(object):
12
13     dmaap_queue = None
14     dmaap_server = None
15     server_thread = None
16
17     def __init__(self):
18         pass
19
20     @staticmethod
21     def setup_dmaap_server(port_num=3904):
22         try:
23             DmaapLibrary.start_dmaap_server_on_new_thread(port_num)
24             return "true"
25         except Exception as e:
26             print (str(e))
27             return "false"
28
29     @staticmethod
30     def start_dmaap_server_on_new_thread(port_num):
31         DmaapLibrary.dmaap_queue = DMaaPQueue(Queue())
32         DmaapLibrary.dmaap_server = DMaaPServer.create_dmaap_server(DmaapLibrary.dmaap_queue, port=port_num)
33         DmaapLibrary.server_thread = threading.Thread(name='DMAAP_HTTPServer',
34                                                       target=DmaapLibrary.dmaap_server.serve_forever)
35         DmaapLibrary.server_thread.start()
36         logger.console("DMaaP Mockup Sever started")
37         DcaeVariables.IsRobotRun = True
38         time.sleep(2)
39
40     @staticmethod
41     def shutdown_dmaap():
42         if DmaapLibrary.dmaap_server is not None:
43             DmaapLibrary.dmaap_server.shutdown()
44             logger.console("DMaaP Server shut down")
45             time.sleep(3)
46             return "true"
47         else:
48             return "false"
49
50     @staticmethod
51     def cleanup_ves_events():
52         if DmaapLibrary.server_thread is not None:
53             DmaapLibrary.dmaap_queue.clean_up_event()
54             logger.console("DMaaP event queue is cleaned up")
55             return "true"
56         logger.console("DMaaP server not started yet")
57         return "false"
58
59     @staticmethod
60     def dmaap_message_receive_on_topic(evtobj, topic):
61
62         evt_str = DmaapLibrary.dmaap_queue.deque_event()
63         while evt_str != None:
64             if evtobj in evt_str and topic in evt_str:
65                 logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
66                 logger.info("On Expected Topic:\n" + topic)
67                 return 'true'
68             evt_str = DmaapLibrary.dmaap_queue.deque_event()
69         return 'false'
70
71     @staticmethod
72     def dmaap_message_receive(evtobj):
73         evt_str = DmaapLibrary.dmaap_queue.deque_event()
74         while evt_str != None:
75             if evtobj in evt_str:
76                 logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
77                 return 'true'
78             evt_str = DmaapLibrary.dmaap_queue.deque_event()
79         return 'false'