b78f399e0e95decf096237ee94bb0af55ce060d6
[appc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.openecomp.appc.executionqueue.impl;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import org.openecomp.appc.executionqueue.MessageExpirationListener;
30 import org.openecomp.appc.executionqueue.helper.Util;
31 import org.openecomp.appc.executionqueue.impl.object.QueueMessage;
32
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38
39 public class QueueManager {
40
41     private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
42
43     private MessageExpirationListener listener;
44     private ExecutorService messageExecutor;
45     private int max_thread_size;
46     private int max_queue_size;
47     private Util executionQueueUtil;
48
49     public QueueManager() {
50         //do nothing
51     }
52
53     /**
54      * Initialization method used by blueprint
55      */
56     public void init() {
57         max_thread_size = executionQueueUtil.getThreadPoolSize();
58         max_queue_size = executionQueueUtil.getExecutionQueueSize();
59         messageExecutor = new ThreadPoolExecutor(
60             max_thread_size,
61             max_thread_size,
62             0L,
63             TimeUnit.MILLISECONDS,
64             new LinkedBlockingQueue(max_queue_size),
65             executionQueueUtil.getThreadFactory(true, "appc-dispatcher"),
66             new ThreadPoolExecutor.AbortPolicy());
67     }
68
69     /**
70      * Destory method used by blueprint
71      */
72     public void stop() {
73         messageExecutor.shutdownNow();
74     }
75
76     public void setListener(MessageExpirationListener listener) {
77         this.listener = listener;
78     }
79
80     /**
81      * Injected by blueprint
82      *
83      * @param executionQueueUtil Util to be set
84      */
85     public void setExecutionQueueUtil(Util executionQueueUtil) {
86         this.executionQueueUtil = executionQueueUtil;
87     }
88
89     public boolean enqueueTask(QueueMessage queueMessage) {
90         boolean isEnqueued = true;
91         try {
92             messageExecutor.execute(() -> {
93                 if (queueMessage.isExpired()) {
94                     logger.debug("Message expired " + queueMessage.getMessage());
95                     if (listener != null) {
96                         listener.onMessageExpiration(queueMessage.getMessage());
97                     } else {
98                         logger.warn("Listener not available for expired message ");
99                     }
100                 } else {
101                     queueMessage.getMessage().run();
102                 }
103             });
104         } catch (RejectedExecutionException ree) {
105             isEnqueued = false;
106         }
107
108         return isEnqueued;
109     }
110 }