1 from Queue import Queue
3 import robot.api.logger as logger
8 from robot_library.dmaap_simulator import DMaaPServer
9 from robot_library.dmaap_simulator.DMaaPQueue import DMaaPQueue
11 class DmaapLibrary(object):
21 def setup_dmaap_server(port_num=3904):
23 DmaapLibrary.start_dmaap_server_on_new_thread(port_num)
25 except Exception as e:
30 def start_dmaap_server_on_new_thread(port_num):
31 DmaapLibrary.dmaap_queue = DMaaPQueue(Queue())
32 DmaapLibrary.dmaap_server = DMaaPServer.create_dmaap_server(DmaapLibrary.dmaap_queue, port=port_num)
33 DmaapLibrary.server_thread = threading.Thread(name='DMAAP_HTTPServer',
34 target=DmaapLibrary.dmaap_server.serve_forever)
35 DmaapLibrary.server_thread.start()
36 logger.console("DMaaP Mockup Sever started")
37 DcaeVariables.IsRobotRun = True
42 if DmaapLibrary.dmaap_server is not None:
43 DmaapLibrary.dmaap_server.shutdown()
44 logger.console("DMaaP Server shut down")
51 def cleanup_ves_events():
52 if DmaapLibrary.server_thread is not None:
53 DmaapLibrary.dmaap_queue.clean_up_event()
54 logger.console("DMaaP event queue is cleaned up")
56 logger.console("DMaaP server not started yet")
60 def dmaap_message_receive_on_topic(evtobj, topic):
62 evt_str = DmaapLibrary.dmaap_queue.deque_event()
63 while evt_str != None:
64 if evtobj in evt_str and topic in evt_str:
65 logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
66 logger.info("On Expected Topic:\n" + topic)
68 evt_str = DmaapLibrary.dmaap_queue.deque_event()
72 def dmaap_message_receive(evtobj):
73 evt_str = DmaapLibrary.dmaap_queue.deque_event()
74 while evt_str != None:
76 logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
78 evt_str = DmaapLibrary.dmaap_queue.deque_event()