8bad66bea3f8c2738109efe5ec0132ecccb8d7f7
[appc.git] / appc-dispatcher / appc-dispatcher-common / execution-queue-management-lib / src / main / java / org / onap / appc / executionqueue / impl / QueueManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.appc.executionqueue.impl;
25
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
28 import org.onap.appc.executionqueue.MessageExpirationListener;
29 import org.onap.appc.executionqueue.helper.Util;
30 import org.onap.appc.executionqueue.impl.object.QueueMessage;
31
32 import java.util.List;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38
39 public class QueueManager {
40
41     private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
42
43     private ExecutorService messageExecutor;
44     private LinkedBlockingQueue<QueueMessage> queue;
45     private int max_thread_size;
46     private int max_queue_size;
47     private Util executionQueueUtil;
48
49     public QueueManager() {
50
51     }
52
53     /**
54      * Initialization method used by blueprint
55      */
56     public void init() {
57         max_thread_size = executionQueueUtil.getThreadPoolSize();
58         max_queue_size = executionQueueUtil.getExecutionQueueSize();
59         messageExecutor = new ThreadPoolExecutor(
60             max_thread_size,
61             max_thread_size,
62             0L,
63             TimeUnit.MILLISECONDS,
64             new LinkedBlockingQueue(max_queue_size),
65             executionQueueUtil.getThreadFactory(true, "appc-dispatcher"),
66             new ThreadPoolExecutor.AbortPolicy());
67     }
68
69     /**
70      * Destory method used by blueprint
71      */
72     public void stop() {
73         // Disable new tasks from being submitted
74         messageExecutor.shutdown();
75         List<Runnable> rejectedRunnables = messageExecutor.shutdownNow();
76         logger.info(String.format("Rejected %d waiting tasks include ", rejectedRunnables.size()));
77
78         try {
79             messageExecutor.shutdownNow(); // Cancel currently executing tasks
80             // Wait a while for tasks to respond to being cancelled
81             while (!messageExecutor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
82                 logger.debug("QueueManager is being shut down because it still has threads not interrupted");
83             }
84         } catch (InterruptedException ie) {
85             // (Re-)Cancel if current thread also interrupted
86             messageExecutor.shutdownNow();
87             // Preserve interrupt status
88             Thread.currentThread().interrupt();
89         }
90     }
91
92     /**
93      * Injected by blueprint
94      *
95      * @param executionQueueUtil
96      */
97     public void setExecutionQueueUtil(Util executionQueueUtil) {
98         this.executionQueueUtil = executionQueueUtil;
99     }
100
101     public boolean enqueueTask(QueueMessage queueMessage) {
102         boolean isEnqueued = true;
103         try {
104             messageExecutor.execute(() -> queueMessage.getMessage().run());
105         } catch (RejectedExecutionException ree) {
106             isEnqueued = false;
107         }
108
109         return isEnqueued;
110     }
111
112     private boolean messageExpired(QueueMessage queueMessage) {
113         return queueMessage.getExpirationTime() != null &&
114             queueMessage.getExpirationTime().getTime() < System.currentTimeMillis();
115     }
116 }