Refactor DMaaP simulator and add tests.
[integration/csit.git] / tests / dcaegen2 / testcases / resources / robot_library / dmaap_test / test_DMaaPSQueue.py
1 from Queue import Queue
2 import pytest
3 from robot_library.dmaap_simulator.DMaaPQueue import DMaaPQueue
4
5 wait_sec_for_dequeing_event = 0.1
6 test_event = "\"topic\":{\"test\":123}"
7
8
9 class TestDMaaPQueue:
10
11     dmaap_simulator = None
12
13     @pytest.fixture(autouse=True, scope="function")
14     def initiate_dmaap_simulator(self):
15         TestDMaaPQueue.dmaap_simulator = DMaaPQueue(Queue())
16         TestDMaaPQueue.dmaap_simulator.set_deque_event_timeout(wait_sec_for_dequeing_event)
17         yield
18
19     def test_when_queue_is_empty_then_deque_returns_none(self):
20         # when
21         event = TestDMaaPQueue.dmaap_simulator.deque_event()
22
23         # then
24         assert event is None
25
26     def test_when_enque_event_then_dequeue_return_same_event(self):
27         # when
28         TestDMaaPQueue.dmaap_simulator.enque_event(test_event)
29         event = TestDMaaPQueue.dmaap_simulator.deque_event()
30
31         # then
32         assert event == test_event
33
34     def test_when_enque_and_dequeue_event_then_deque_return_none(self):
35         # when
36         TestDMaaPQueue.dmaap_simulator.enque_event(test_event)
37         TestDMaaPQueue.dmaap_simulator.deque_event()
38         event = TestDMaaPQueue.dmaap_simulator.deque_event()
39
40         # then
41         assert event is None
42
43     def test_when_enque_few_events_and_clean_up_then_dequeu_return_none(self):
44         # when
45         TestDMaaPQueue.dmaap_simulator.enque_event(test_event)
46         TestDMaaPQueue.dmaap_simulator.enque_event(test_event)
47         TestDMaaPQueue.dmaap_simulator.enque_event(test_event)
48         TestDMaaPQueue.dmaap_simulator.clean_up_event()
49         event = TestDMaaPQueue.dmaap_simulator.deque_event()
50
51         # then
52         assert event is None