3 from mock import MagicMock
5 sys.modules['robot'] = MagicMock()
6 sys.modules['robot.api'] = MagicMock()
7 sys.modules['robot.api.logger'] = MagicMock()
8 from robot_library.DmaapLibrary import DmaapLibrary
10 wait_sec_for_dequeing_event = 0.1
11 test_event = "{\"test\":\"123\"}"
13 test_message = "\"" + test_topic + "\":" + test_event
16 class TestDMaaPSimulator:
18 @pytest.fixture(autouse=True, scope="class")
19 def initiate_dmaap_simulator(self):
20 DmaapLibrary.setup_dmaap_server()
21 DmaapLibrary.dmaap_queue.set_deque_event_timeout(wait_sec_for_dequeing_event)
23 assert DmaapLibrary.shutdown_dmaap() == "true"
25 @pytest.fixture(autouse=True, scope="function")
26 def clear_dmaap_simulator(self):
28 DmaapLibrary.cleanup_ves_events()
30 def test_start_stop_dmaap_server(self):
32 assert DmaapLibrary.dmaap_queue is not None
33 assert DmaapLibrary.dmaap_server is not None
34 assert DmaapLibrary.server_thread is not None
36 def test_dmaap_server_returns_true_when_event_is_present_on_queue(self):
38 DmaapLibrary.dmaap_queue.enque_event(test_message)
41 assert DmaapLibrary.dmaap_message_receive(test_event) == 'true'
43 def test_dmaap_server_returns_true_when_event_is_present_on_given_topic_on_queue(self):
45 DmaapLibrary.dmaap_queue.enque_event(test_message)
48 assert DmaapLibrary.dmaap_message_receive_on_topic(test_event, test_topic) == 'true'
50 def test_dmaap_server_returns_timeout_when_event_is_not_present_on_queue(self):
52 assert DmaapLibrary.dmaap_message_receive(test_event) == 'false'
54 def test_dmaap_server_returns_false_when_queue_was_cleared(self):
56 DmaapLibrary.dmaap_queue.enque_event(test_message)
57 DmaapLibrary.dmaap_queue.enque_event(test_message)
58 DmaapLibrary.dmaap_queue.enque_event(test_message)
59 DmaapLibrary.cleanup_ves_events()
62 assert DmaapLibrary.dmaap_message_receive_on_topic(test_event, test_topic) == 'false'