Refactor DMaaP simulator and add tests.
[integration/csit.git] / tests / dcaegen2 / testcases / resources / robot_library / dmaap_simulator / DMaaPQueue.py
1 class DMaaPQueue(object):
2
3     def __init__(self, event_queue, wait_timeout_sec=25):
4         self.event_queue = event_queue
5         self.wait_timeout_sec = wait_timeout_sec
6
7     def set_deque_event_timeout(self, wait_timeout_sec):
8         self.wait_timeout_sec = wait_timeout_sec
9
10     def clean_up_event(self):
11         if self.queue_is_valid():
12             with self.event_queue.mutex:
13                 try:
14                     self.event_queue.queue.clear()
15                 except:
16                     pass
17
18     def enque_event(self, event):
19         event_placed_on_queue = False
20         if self.queue_is_valid():
21             event_placed_on_queue = self._enque_event(event, event_placed_on_queue)
22         return event_placed_on_queue
23
24     def _enque_event(self, event, event_placed_on_queue):
25         try:
26             self.event_queue.put(event)
27             event_placed_on_queue = True
28         except Exception as e:
29             print (str(e))
30         return event_placed_on_queue
31
32     def deque_event(self, wait_sec=None):
33         if wait_sec is None:
34             wait_sec = self.wait_timeout_sec
35         event_from_queue = None
36         if self.queue_is_valid():
37             event_from_queue = self._deque_event(event_from_queue, wait_sec)
38         return event_from_queue
39
40     def _deque_event(self, event_from_queue, wait_sec):
41         try:
42             event_from_queue = self.event_queue.get(True, wait_sec)
43         except Exception as e:
44             print("DMaaP Event dequeue timeout")
45         return event_from_queue
46
47     def queue_is_valid(self):
48         return self.event_queue is not None