add fixes for wt sulfur
[ccsdk/features.git] / sdnr / wt / common / src / main / java / org / onap / ccsdk / features / sdnr / wt / common / threading / KeyBasedThreadpool.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP : ccsdk features
4  * ================================================================================
5  * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
6  * All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  */
22 package org.onap.ccsdk.features.sdnr.wt.common.threading;
23
24 import java.util.AbstractMap.SimpleEntry;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Threadpool for running n instances per key T
38  *
39  * @author jack
40  *
41  * @param <T> type of key for the pools
42  * @param <S> type of arg to create a runner
43  */
44 public class KeyBasedThreadpool<T, S> implements GenericRunnableFactoryCallback<T> {
45
46     private static final Logger LOG = LoggerFactory.getLogger(KeyBasedThreadpool.class);
47     private final ConcurrentLinkedQueue<Entry<T, S>> queue;
48     private final List<T> runningKeys;
49     private final int keyPoolSize;
50     private final GenericRunnableFactory<T, S> factory;
51     private final ExecutorService executor;
52
53     /**
54      *
55      * @param poolSize overall maximum amount of threads
56      * @param keyPoolSize amount of threads per key
57      * @param runner runnable to start
58      */
59     public KeyBasedThreadpool(int poolSize, int keyPoolSize, GenericRunnableFactory<T, S> factory) {
60         this.queue = new ConcurrentLinkedQueue<>();
61         this.keyPoolSize = keyPoolSize;
62         this.factory = factory;
63         this.executor = Executors.newFixedThreadPool(poolSize);
64         this.runningKeys = Collections.synchronizedList(new ArrayList<T>());
65         LOG.info("starting key-based threadpool with keysize={} and size={}", keyPoolSize, poolSize);
66     }
67
68     public synchronized void execute(T key, S arg) {
69         if (this.isKeyPoolSizeReached(key)) {
70             LOG.debug("pool size for key {} reached. add to queue", key);
71             queue.add(new SimpleEntry<>(key, arg));
72
73         } else {
74             LOG.debug("starting executor for key {}.", key);
75             this.runningKeys.add(key);
76             this.executor.execute(new RunnableWrapper<T>(this.factory.create(key, arg), key, this));
77         }
78
79     }
80
81     private void executeNext() {
82         Entry<T, S> entry = this.queue.peek();
83         if (entry != null) {
84             LOG.debug("executing next for key {} with arg {}", entry.getKey(), entry.getValue());
85             if (!this.isKeyPoolSizeReached(entry.getKey())) {
86                 this.queue.poll();
87                 this.runningKeys.add(entry.getKey());
88                 this.executor.execute(new RunnableWrapper<T>(this.factory.create(entry.getKey(), entry.getValue()),
89                         entry.getKey(), this));
90             } else {
91                 LOG.debug("key pool size reached. waiting for someone else to stop");
92             }
93         } else {
94             LOG.info("nothing to execute. queue is empty.");
95         }
96     }
97
98     private boolean isKeyPoolSizeReached(T key) {
99         LOG.trace("running keys size={}", this.runningKeys.size());
100         if (this.keyPoolSize == 1) {
101             return this.runningKeys.contains(key);
102         }
103         return this.runningKeys.stream().filter(e -> e == key).count() >= this.keyPoolSize;
104     }
105
106     @Override
107     public synchronized void onFinish(T key) {
108         LOG.debug("executor finished for key {}.", key);
109         this.runningKeys.remove(key);
110         this.executeNext();
111     }
112
113     public synchronized void join() {
114         LOG.debug("wait for all executors to finish");
115         while (this.runningKeys.size() > 0 && this.queue.size() > 0) {
116             try {
117                 Thread.sleep(1000);
118             } catch (InterruptedException e) {
119                 Thread.currentThread().interrupt();
120             }
121         }
122     }
123
124     private static class RunnableWrapper<T> implements Runnable {
125
126         private final Runnable inner;
127         private final GenericRunnableFactoryCallback<T> callback;
128         private final T key;
129
130         public RunnableWrapper(Runnable inner, T key, GenericRunnableFactoryCallback<T> cb) {
131             this.inner = inner;
132             this.callback = cb;
133             this.key = key;
134         }
135
136         @Override
137         public void run() {
138             this.inner.run();
139             this.callback.onFinish(this.key);
140         }
141
142     }
143 }