b87349411c78d7d3c77836e84d4dfa57aa38a760
[appc.git] / appc-client / client-lib / src / main / java / org / openecomp / appc / client / impl / core / TaskQueueManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.client.impl.core;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29
30 import java.util.List;
31 import java.util.Properties;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.TimeUnit;
35
36 /** Creates a task queue pool that reuses a fixed number of threads.
37  * Assigns one thread for each queue.
38  */
39 class TaskQueueManager {
40     private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueueManager.class);
41     private ExecutorService executorService;
42     private final static String DEFAULT_POOL_SIZE = "10";
43     private final static String CLIENT_POOL_SIZE = "client.pool.size";
44     private TaskQueue[] queues;
45     private int poolInt;
46
47     TaskQueueManager(Properties properties){
48         String size = properties.getProperty(CLIENT_POOL_SIZE, DEFAULT_POOL_SIZE);
49         poolInt = Integer.parseInt(size);
50         this.executorService = Executors.newFixedThreadPool(poolInt);
51         initTaskQueues();
52     }
53
54     private void initTaskQueues(){
55         queues = new TaskQueue[poolInt];
56         for(int i=0; i<poolInt; i++){
57             queues[i] = new TaskQueue();
58             this.executorService.submit(queues[i]);
59         }
60     }
61
62     void submit(String corrID, Runnable task) throws InterruptedException {
63         TaskQueue queue = getTaskQueue(corrID);
64         queue.addTask(task);
65     }
66
67     /**
68      * ensures synchronous handling all responses and timeout belongs to same correlation ID
69      * @param corrID
70      * @return - @{@link TaskQueue}
71      */
72     private TaskQueue getTaskQueue(String corrID){
73         int index = Math.abs(corrID.hashCode()) % poolInt;
74         return queues[index];
75     }
76
77     /**
78      * goes over queues for stopping threads
79      * @throws InterruptedException
80      */
81     void stopQueueManager() throws InterruptedException {
82         for(int i=0; i<poolInt; i++){
83             queues[i].stopQueue();
84             queues[i].addTask(new Runnable() {
85                 @Override
86                 public void run() {
87                     /**
88                      * wake up the queue for stopping thread
89                      */
90                 }
91             });
92         }
93         List<Runnable> listTask = executorService.shutdownNow();
94         if (!executorService.awaitTermination(6, TimeUnit.SECONDS))
95             System.err.println("Pool did not terminate");
96         LOG.info("the amount of tasks that never commenced execution " + listTask.size());
97     }
98 }