1 class DMaaPQueue(object):
3 def __init__(self, event_queue, wait_timeout_sec=25):
4 self.event_queue = event_queue
5 self.wait_timeout_sec = wait_timeout_sec
7 def set_deque_event_timeout(self, wait_timeout_sec):
8 self.wait_timeout_sec = wait_timeout_sec
10 def clean_up_event(self):
11 if self.queue_is_valid():
12 with self.event_queue.mutex:
14 self.event_queue.queue.clear()
18 def enque_event(self, event):
19 event_placed_on_queue = False
20 if self.queue_is_valid():
21 event_placed_on_queue = self._enque_event(event, event_placed_on_queue)
22 return event_placed_on_queue
24 def _enque_event(self, event, event_placed_on_queue):
26 self.event_queue.put(event)
27 event_placed_on_queue = True
28 except Exception as e:
30 return event_placed_on_queue
32 def deque_event(self, wait_sec=None):
34 wait_sec = self.wait_timeout_sec
35 event_from_queue = None
36 if self.queue_is_valid():
37 event_from_queue = self._deque_event(event_from_queue, wait_sec)
38 return event_from_queue
40 def _deque_event(self, event_from_queue, wait_sec):
42 event_from_queue = self.event_queue.get(True, wait_sec)
43 except Exception as e:
44 print("DMaaP Event dequeue timeout")
45 return event_from_queue
47 def queue_is_valid(self):
48 return self.event_queue is not None