2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.appc.executionqueue.impl;
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
28 import org.onap.appc.executionqueue.MessageExpirationListener;
29 import org.onap.appc.executionqueue.helper.Util;
30 import org.onap.appc.executionqueue.impl.object.QueueMessage;
32 import java.util.List;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
39 public class QueueManager {
41 private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
43 private ExecutorService messageExecutor;
44 private LinkedBlockingQueue<QueueMessage> queue;
45 private int max_thread_size;
46 private int max_queue_size;
47 private Util executionQueueUtil;
49 public QueueManager() {
54 * Initialization method used by blueprint
57 max_thread_size = executionQueueUtil.getThreadPoolSize();
58 max_queue_size = executionQueueUtil.getExecutionQueueSize();
59 messageExecutor = new ThreadPoolExecutor(
63 TimeUnit.MILLISECONDS,
64 new LinkedBlockingQueue(max_queue_size),
65 executionQueueUtil.getThreadFactory(true, "appc-dispatcher"),
66 new ThreadPoolExecutor.AbortPolicy());
70 * Destory method used by blueprint
73 // Disable new tasks from being submitted
74 messageExecutor.shutdown();
75 List<Runnable> rejectedRunnables = messageExecutor.shutdownNow();
76 logger.info(String.format("Rejected %d waiting tasks include ", rejectedRunnables.size()));
79 messageExecutor.shutdownNow(); // Cancel currently executing tasks
80 // Wait a while for tasks to respond to being cancelled
81 while (!messageExecutor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
82 logger.debug("QueueManager is being shut down because it still has threads not interrupted");
84 } catch (InterruptedException ie) {
85 // (Re-)Cancel if current thread also interrupted
86 messageExecutor.shutdownNow();
87 // Preserve interrupt status
88 Thread.currentThread().interrupt();
93 * Injected by blueprint
95 * @param executionQueueUtil
97 public void setExecutionQueueUtil(Util executionQueueUtil) {
98 this.executionQueueUtil = executionQueueUtil;
101 public boolean enqueueTask(QueueMessage queueMessage) {
102 boolean isEnqueued = true;
104 messageExecutor.execute(() -> queueMessage.getMessage().run());
105 } catch (RejectedExecutionException ree) {
112 private boolean messageExpired(QueueMessage queueMessage) {
113 return queueMessage.getExpirationTime() != null &&
114 queueMessage.getExpirationTime().getTime() < System.currentTimeMillis();