1 from Queue import Queue
3 import robot.api.logger as logger
8 from robot_library.dmaap_simulator import DMaaPServer
9 from robot_library.dmaap_simulator.DMaaPQueue import DMaaPQueue
11 class DmaapLibrary(object):
21 def setup_dmaap_server(port_num=3904):
23 DmaapLibrary.start_dmaap_server_on_new_thread(port_num)
25 except Exception as e:
30 def start_dmaap_server_on_new_thread(port_num):
31 DmaapLibrary.dmaap_queue = DMaaPQueue(Queue())
32 DmaapLibrary.dmaap_server = DMaaPServer.create_dmaap_server(DmaapLibrary.dmaap_queue, port=port_num)
33 DmaapLibrary.server_thread = threading.Thread(name='DMAAP_HTTPServer',
34 target=DmaapLibrary.dmaap_server.serve_forever)
35 DmaapLibrary.server_thread.start()
36 logger.console("DMaaP Mockup Sever started")
37 DcaeVariables.IsRobotRun = True
42 if DmaapLibrary.dmaap_server is not None:
43 DmaapLibrary.dmaap_server.shutdown()
44 logger.console("DMaaP Server shut down")
51 def cleanup_ves_events():
52 DmaapLibrary.dmaap_server.reset_dmaap_succesfull_code()
53 if DmaapLibrary.server_thread is not None:
54 DmaapLibrary.dmaap_queue.clean_up_event()
55 logger.console("DMaaP event queue is cleaned up")
57 logger.console("DMaaP server not started yet")
61 def dmaap_message_receive_on_topic(evtobj, topic):
63 evt_str = DmaapLibrary.dmaap_queue.deque_event()
64 while evt_str != None:
65 if evtobj in evt_str and topic in evt_str:
66 logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
67 logger.info("On Expected Topic:\n" + topic)
69 evt_str = DmaapLibrary.dmaap_queue.deque_event()
73 def dmaap_message_receive(evtobj):
74 evt_str = DmaapLibrary.dmaap_queue.deque_event()
75 while evt_str != None:
77 logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
79 evt_str = DmaapLibrary.dmaap_queue.deque_event()
83 def set_successfull_dmaap_code(code):
84 DmaapLibrary.dmaap_server.set_dmaap_successfull_code(int(code))