056bfda297a8230bf69f5fb5e5563f05c27f3841
[appc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : APP-C
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights
6  *                                              reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.openecomp.appc.executionqueue.impl;
23
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27
28 import org.openecomp.appc.executionqueue.MessageExpirationListener;
29 import org.openecomp.appc.executionqueue.helper.Util;
30 import org.openecomp.appc.executionqueue.impl.object.QueueMessage;
31
32 import com.att.eelf.configuration.EELFLogger;
33 import com.att.eelf.configuration.EELFManager;
34
35 public class QueueManager {
36
37     private LinkedBlockingQueue<QueueMessage<? extends Runnable>> queue;
38
39     private MessageExpirationListener listener;
40
41     private static int MAX_QUEUE_SIZE = Util.getExecutionQueSize();
42
43     private static int MAX_THREAD_SIZE = Util.getThreadPoolSize();
44
45     private ExecutorService messageExecutor;
46
47     private static final EELFLogger logger =
48             EELFManager.getInstance().getLogger(QueueManager.class);
49
50     private QueueManager(){
51         init();
52     }
53
54     private static class QueueManagerHolder {
55         private static final QueueManager INSTANCE = new QueueManager();
56     }
57
58     public static QueueManager getInstance() {
59         return QueueManagerHolder.INSTANCE;
60     }
61
62     private void init(){
63         queue = new LinkedBlockingQueue<QueueMessage<? extends Runnable>>(MAX_QUEUE_SIZE);
64         messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true));
65
66         for(int i=0;i<MAX_THREAD_SIZE;i++){
67             messageExecutor.submit(new Runnable() {
68                 @Override
69                 public void run() {
70                     while (true){
71                         try{
72                             QueueMessage<? extends Runnable> queueMessage = queue.take();
73                             if (queueMessage.isExpired()) {
74                                 logger.debug("Message expired "+ queueMessage.getMessage());
75                                 if(listener != null){
76                                     listener.onMessageExpiration(queueMessage.getMessage());
77                                 }
78                                 else{
79                                     logger.warn("Listener not available for expired message ");
80                                 }
81                             }
82                             else{
83                                 queueMessage.getMessage().run();
84                             }
85                         } catch (Exception e) {
86                             logger.error("Error in startMessagePolling method of ExecutionQueueServiceImpl" + e.getMessage());
87                         }
88                     }
89                 }
90             });
91         }
92     }
93
94     public void setListener(MessageExpirationListener listener) {
95         this.listener = listener;
96     }
97
98     public boolean enqueueTask(QueueMessage<? extends Runnable> queueMessage) {
99         return queue.offer(queueMessage);
100     }
101
102 }