private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
- private MessageExpirationListener listener;
private ExecutorService messageExecutor;
+ private LinkedBlockingQueue<QueueMessage> queue;
private int max_thread_size;
private int max_queue_size;
private Util executionQueueUtil;
public QueueManager() {
- //do nothing
+
}
/**
}
}
- public void setListener(MessageExpirationListener listener) {
- this.listener = listener;
- }
-
/**
* Injected by blueprint
*
- * @param executionQueueUtil Util to be set
+ * @param executionQueueUtil
*/
public void setExecutionQueueUtil(Util executionQueueUtil) {
this.executionQueueUtil = executionQueueUtil;
public boolean enqueueTask(QueueMessage queueMessage) {
boolean isEnqueued = true;
try {
- messageExecutor.execute(() -> {
- if (queueMessage.isExpired()) {
- logger.debug("Message expired " + queueMessage.getMessage());
- if (listener != null) {
- listener.onMessageExpiration(queueMessage.getMessage());
- } else {
- logger.warn("Listener not available for expired message ");
- }
- } else {
- queueMessage.getMessage().run();
- }
- });
+ messageExecutor.execute(() -> queueMessage.getMessage().run());
} catch (RejectedExecutionException ree) {
isEnqueued = false;
}
return isEnqueued;
}
+
+ private boolean messageExpired(QueueMessage queueMessage) {
+ return queueMessage.getExpirationTime() != null &&
+ queueMessage.getExpirationTime().getTime() < System.currentTimeMillis();
+ }
}