Third part of onap rename
[appc.git] / appc-dispatcher / appc-dispatcher-common / execution-queue-management-lib / src / main / java / org / onap / appc / executionqueue / impl / QueueManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.executionqueue.impl;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import org.onap.appc.executionqueue.MessageExpirationListener;
30 import org.onap.appc.executionqueue.helper.Util;
31 import org.onap.appc.executionqueue.impl.object.QueueMessage;
32
33 import java.util.List;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.ThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39
40 public class QueueManager {
41
42     private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
43
44     private MessageExpirationListener listener;
45     private ExecutorService messageExecutor;
46     private int max_thread_size;
47     private int max_queue_size;
48     private Util executionQueueUtil;
49
50     public QueueManager() {
51         //do nothing
52     }
53
54     /**
55      * Initialization method used by blueprint
56      */
57     public void init() {
58         max_thread_size = executionQueueUtil.getThreadPoolSize();
59         max_queue_size = executionQueueUtil.getExecutionQueueSize();
60         messageExecutor = new ThreadPoolExecutor(
61             max_thread_size,
62             max_thread_size,
63             0L,
64             TimeUnit.MILLISECONDS,
65             new LinkedBlockingQueue(max_queue_size),
66             executionQueueUtil.getThreadFactory(true, "appc-dispatcher"),
67             new ThreadPoolExecutor.AbortPolicy());
68     }
69
70     /**
71      * Destory method used by blueprint
72      */
73     public void stop() {
74         // Disable new tasks from being submitted
75         messageExecutor.shutdown();
76         List<Runnable> rejectedRunnables = messageExecutor.shutdownNow();
77         logger.info(String.format("Rejected %d waiting tasks include ", rejectedRunnables.size()));
78
79         try {
80             messageExecutor.shutdownNow(); // Cancel currently executing tasks
81             // Wait a while for tasks to respond to being cancelled
82             while (!messageExecutor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
83                 logger.debug("QueueManager is being shut down because it still has threads not interrupted");
84             }
85         } catch (InterruptedException ie) {
86             // (Re-)Cancel if current thread also interrupted
87             messageExecutor.shutdownNow();
88             // Preserve interrupt status
89             Thread.currentThread().interrupt();
90         }
91     }
92
93     public void setListener(MessageExpirationListener listener) {
94         this.listener = listener;
95     }
96
97     /**
98      * Injected by blueprint
99      *
100      * @param executionQueueUtil Util to be set
101      */
102     public void setExecutionQueueUtil(Util executionQueueUtil) {
103         this.executionQueueUtil = executionQueueUtil;
104     }
105
106     public boolean enqueueTask(QueueMessage queueMessage) {
107         boolean isEnqueued = true;
108         try {
109             messageExecutor.execute(() -> {
110                 if (queueMessage.isExpired()) {
111                     logger.debug("Message expired " + queueMessage.getMessage());
112                     if (listener != null) {
113                         listener.onMessageExpiration(queueMessage.getMessage());
114                     } else {
115                         logger.warn("Listener not available for expired message ");
116                     }
117                 } else {
118                     queueMessage.getMessage().run();
119                 }
120             });
121         } catch (RejectedExecutionException ree) {
122             isEnqueued = false;
123         }
124
125         return isEnqueued;
126     }
127 }