Include impacted changes for APPC-346,APPC-348
[appc.git] / appc-dispatcher / appc-dispatcher-common / execution-queue-management-lib / src / main / java / org / onap / appc / executionqueue / impl / QueueManager.java
index db0e3d4..c33c660 100644 (file)
@@ -41,14 +41,14 @@ public class QueueManager {
 
     private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
 
-    private MessageExpirationListener listener;
     private ExecutorService messageExecutor;
+    private LinkedBlockingQueue<QueueMessage> queue;
     private int max_thread_size;
     private int max_queue_size;
     private Util executionQueueUtil;
 
     public QueueManager() {
-        //do nothing
+
     }
 
     /**
@@ -90,14 +90,10 @@ public class QueueManager {
         }
     }
 
-    public void setListener(MessageExpirationListener listener) {
-        this.listener = listener;
-    }
-
     /**
      * Injected by blueprint
      *
-     * @param executionQueueUtil Util to be set
+     * @param executionQueueUtil
      */
     public void setExecutionQueueUtil(Util executionQueueUtil) {
         this.executionQueueUtil = executionQueueUtil;
@@ -106,22 +102,16 @@ public class QueueManager {
     public boolean enqueueTask(QueueMessage queueMessage) {
         boolean isEnqueued = true;
         try {
-            messageExecutor.execute(() -> {
-                if (queueMessage.isExpired()) {
-                    logger.debug("Message expired " + queueMessage.getMessage());
-                    if (listener != null) {
-                        listener.onMessageExpiration(queueMessage.getMessage());
-                    } else {
-                        logger.warn("Listener not available for expired message ");
-                    }
-                } else {
-                    queueMessage.getMessage().run();
-                }
-            });
+            messageExecutor.execute(() -> queueMessage.getMessage().run());
         } catch (RejectedExecutionException ree) {
             isEnqueued = false;
         }
 
         return isEnqueued;
     }
+
+    private boolean messageExpired(QueueMessage queueMessage) {
+        return queueMessage.getExpirationTime() != null &&
+            queueMessage.getExpirationTime().getTime() < System.currentTimeMillis();
+    }
 }