--- /dev/null
+class DMaaPQueue(object):
+
+ def __init__(self, event_queue, wait_timeout_sec=25):
+ self.event_queue = event_queue
+ self.wait_timeout_sec = wait_timeout_sec
+
+ def set_deque_event_timeout(self, wait_timeout_sec):
+ self.wait_timeout_sec = wait_timeout_sec
+
+ def clean_up_event(self):
+ if self.queue_is_valid():
+ with self.event_queue.mutex:
+ try:
+ self.event_queue.queue.clear()
+ except:
+ pass
+
+ def enque_event(self, event):
+ event_placed_on_queue = False
+ if self.queue_is_valid():
+ event_placed_on_queue = self._enque_event(event, event_placed_on_queue)
+ return event_placed_on_queue
+
+ def _enque_event(self, event, event_placed_on_queue):
+ try:
+ self.event_queue.put(event)
+ event_placed_on_queue = True
+ except Exception as e:
+ print (str(e))
+ return event_placed_on_queue
+
+ def deque_event(self, wait_sec=None):
+ if wait_sec is None:
+ wait_sec = self.wait_timeout_sec
+ event_from_queue = None
+ if self.queue_is_valid():
+ event_from_queue = self._deque_event(event_from_queue, wait_sec)
+ return event_from_queue
+
+ def _deque_event(self, event_from_queue, wait_sec):
+ try:
+ event_from_queue = self.event_queue.get(True, wait_sec)
+ except Exception as e:
+ print("DMaaP Event dequeue timeout")
+ return event_from_queue
+
+ def queue_is_valid(self):
+ return self.event_queue is not None