TaskQueueManager fixes
[appc.git] / appc-client / client-lib / src / main / java / org / onap / appc / client / impl / core / TaskQueueManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.client.impl.core;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.util.List;
30 import java.util.Properties;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.TimeUnit;
34
35 /**
36  * Creates a task queue pool that reuses a fixed number of threads. Assigns one thread for each queue.
37  */
38 class TaskQueueManager {
39
40     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueueManager.class);
41     private static final String DEFAULT_POOL_SIZE = "10";
42     private static final String CLIENT_POOL_SIZE = "client.pool.size";
43
44     private ExecutorService executorService;
45     private TaskQueue[] queues;
46     private int poolInt;
47
48     TaskQueueManager(Properties properties) {
49         String size = properties.getProperty(CLIENT_POOL_SIZE, DEFAULT_POOL_SIZE);
50         poolInt = Integer.parseInt(size);
51         this.executorService = Executors.newFixedThreadPool(poolInt);
52         initTaskQueues();
53     }
54
55     private void initTaskQueues() {
56         queues = new TaskQueue[poolInt];
57         for (int i = 0; i < poolInt; i++) {
58             queues[i] = new TaskQueue();
59             this.executorService.submit(queues[i]);
60         }
61     }
62
63     void submit(String corrID, Runnable task) throws InterruptedException {
64         TaskQueue queue = getTaskQueue(corrID);
65         queue.addTask(task);
66     }
67
68     /**
69      * ensures synchronous handling all responses and timeout belongs to same correlation ID
70      * @return - @{@link TaskQueue}
71      */
72     private TaskQueue getTaskQueue(String corrID) {
73         int index = Math.abs(corrID.hashCode()) % poolInt;
74         return queues[index];
75     }
76
77     /**
78      * goes over queues for stopping threads
79      */
80     void stopQueueManager() throws InterruptedException {
81         for (int i = 0; i < poolInt; i++) {
82             queues[i].stopQueue();
83             queues[i].addTask(() -> {
84              // wake up the queue for stopping thread
85             });
86         }
87         List<Runnable> listTask = executorService.shutdownNow();
88         if (!executorService.awaitTermination(6, TimeUnit.SECONDS)) {
89             LOG.error("Pool did not terminate");
90         }
91         LOG.info("the amount of tasks that never commenced execution " + listTask.size());
92     }
93 }