Refactor DMaaP simulator and add tests.
[integration/csit.git] / tests / dcaegen2 / testcases / resources / robot_library / dmaap_test / test_DMaaPSimulator.py
1 import sys
2 import pytest
3 from mock import MagicMock
4
5 sys.modules['robot'] = MagicMock()
6 sys.modules['robot.api'] = MagicMock()
7 sys.modules['robot.api.logger'] = MagicMock()
8 from robot_library.DmaapLibrary import DmaapLibrary
9
10 wait_sec_for_dequeing_event = 0.1
11 test_event = "{\"test\":\"123\"}"
12 test_topic = "topic"
13 test_message = "\"" + test_topic + "\":" + test_event
14
15
16 class TestDMaaPSimulator:
17
18     @pytest.fixture(autouse=True, scope="class")
19     def initiate_dmaap_simulator(self):
20         DmaapLibrary.setup_dmaap_server()
21         DmaapLibrary.dmaap_queue.set_deque_event_timeout(wait_sec_for_dequeing_event)
22         yield
23         assert DmaapLibrary.shutdown_dmaap() == "true"
24
25     @pytest.fixture(autouse=True, scope="function")
26     def clear_dmaap_simulator(self):
27         yield
28         DmaapLibrary.cleanup_ves_events()
29
30     def test_start_stop_dmaap_server(self):
31         # when / then
32         assert DmaapLibrary.dmaap_queue is not None
33         assert DmaapLibrary.dmaap_server is not None
34         assert DmaapLibrary.server_thread is not None
35
36     def test_dmaap_server_returns_true_when_event_is_present_on_queue(self):
37         # when
38         DmaapLibrary.dmaap_queue.enque_event(test_message)
39
40         # then
41         assert DmaapLibrary.dmaap_message_receive(test_event) == 'true'
42
43     def test_dmaap_server_returns_true_when_event_is_present_on_given_topic_on_queue(self):
44         # when
45         DmaapLibrary.dmaap_queue.enque_event(test_message)
46
47         # then
48         assert DmaapLibrary.dmaap_message_receive_on_topic(test_event, test_topic) == 'true'
49
50     def test_dmaap_server_returns_timeout_when_event_is_not_present_on_queue(self):
51         # when / then
52         assert DmaapLibrary.dmaap_message_receive(test_event) == 'false'
53
54     def test_dmaap_server_returns_false_when_queue_was_cleared(self):
55         # when
56         DmaapLibrary.dmaap_queue.enque_event(test_message)
57         DmaapLibrary.dmaap_queue.enque_event(test_message)
58         DmaapLibrary.dmaap_queue.enque_event(test_message)
59         DmaapLibrary.cleanup_ves_events()
60
61         # then
62         assert DmaapLibrary.dmaap_message_receive_on_topic(test_event, test_topic) == 'false'