f507eec13bd217b2171615d39da8c5a31f45ed4d
[ccsdk/features.git] / sdnr / wt / common / src / main / java / org / onap / ccsdk / features / sdnr / wt / common / threading / KeyBasedThreadpool.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP : ccsdk features
4  * ================================================================================
5  * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
6  * All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  */
22 package org.onap.ccsdk.features.sdnr.wt.common.threading;
23
24 import java.util.AbstractMap.SimpleEntry;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Threadpool for running n instances per key T
38  *
39  * @author jack
40  *
41  * @param <T> type of key for the pools
42  * @param <S> type of arg to create a runner
43  */
44 public class KeyBasedThreadpool<T, S> implements GenericRunnableFactoryCallback<T> {
45
46     private static final Logger LOG = LoggerFactory.getLogger(KeyBasedThreadpool.class);
47     private final Queue<Entry<T, S>> queue;
48     private final List<T> runningKeys;
49     private final int keyPoolSize;
50     private final GenericRunnableFactory<T, S> factory;
51     private final ExecutorService executor;
52
53     /**
54      *
55      * @param poolSize overall maximum amount of threads
56      * @param keyPoolSize amount of threads per key
57      * @param runner runnable to start
58      */
59     public KeyBasedThreadpool(int poolSize, int keyPoolSize, GenericRunnableFactory<T, S> factory) {
60         this.queue = new ConcurrentLinkedQueue<>();
61         this.keyPoolSize = keyPoolSize;
62         this.factory = factory;
63         this.executor = Executors.newFixedThreadPool(poolSize);
64         this.runningKeys = Collections.synchronizedList(new ArrayList<T>());
65         LOG.info("starting key-based threadpool with keysize={} and size={}", keyPoolSize, poolSize);
66     }
67
68     public void execute(T key, S arg) {
69         if (this.isKeyPoolSizeReached(key)) {
70             LOG.debug("pool size for key {} reached. add to queue", key);
71             queue.add(new SimpleEntry<>(key, arg));
72
73         } else {
74             LOG.debug("starting executor for key {}.", key);
75             this.runningKeys.add(key);
76             this.executor.execute(new RunnableWrapper<T>(this.factory.create(key, arg), key, this));
77         }
78
79     }
80
81     private void executeNext() {
82         Entry<T, S> entry = this.queue.peek();
83         if (entry != null) {
84             LOG.debug("executing next for key {} with arg {}", entry.getKey(), entry.getValue());
85             if (!this.isKeyPoolSizeReached(entry.getKey())) {
86                 this.queue.poll();
87                 this.runningKeys.add(entry.getKey());
88                 this.executor.execute(new RunnableWrapper<T>(this.factory.create(entry.getKey(), entry.getValue()),
89                         entry.getKey(), this));
90             } else {
91                 LOG.debug("key pool size reached. waiting for someone else to stop");
92             }
93         } else {
94             LOG.info("nothing to execute. queue is empty.");
95         }
96     }
97
98     private boolean isKeyPoolSizeReached(T key) {
99         LOG.trace("running keys size={}", this.runningKeys.size());
100         return this.runningKeys.stream().filter(e -> e == key).count() >= this.keyPoolSize;
101     }
102
103     @Override
104     public synchronized void onFinish(T key) {
105         LOG.debug("executor finished for key {}.", key);
106         this.runningKeys.remove(key);
107         this.executeNext();
108     }
109
110     public void join() {
111         LOG.debug("wait for all executors to finish");
112         while (this.runningKeys.size() > 0 && this.queue.size() > 0) {
113             try {
114                 Thread.sleep(1000);
115             } catch (InterruptedException e) {
116                 Thread.currentThread().interrupt();
117             }
118         }
119     }
120
121     private static class RunnableWrapper<T> implements Runnable {
122
123         private final Runnable inner;
124         private final GenericRunnableFactoryCallback<T> callback;
125         private final T key;
126
127         public RunnableWrapper(Runnable inner, T key, GenericRunnableFactoryCallback<T> cb) {
128             this.inner = inner;
129             this.callback = cb;
130             this.key = key;
131         }
132
133         @Override
134         public void run() {
135             this.inner.run();
136             this.callback.onFinish(this.key);
137         }
138
139     }
140 }