2 * ============LICENSE_START=======================================================
3 * ONAP : ccsdk features
4 * ================================================================================
5 * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.features.sdnr.wt.common.threading;
24 import java.util.AbstractMap.SimpleEntry;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Threadpool for running n instances per key T
41 * @param <T> type of key for the pools
42 * @param <S> type of arg to create a runner
44 public class KeyBasedThreadpool<T, S> implements GenericRunnableFactoryCallback<T> {
46 private static final Logger LOG = LoggerFactory.getLogger(KeyBasedThreadpool.class);
47 private final ConcurrentLinkedQueue<Entry<T, S>> queue;
48 private final List<T> runningKeys;
49 private final int keyPoolSize;
50 private final GenericRunnableFactory<T, S> factory;
51 private final ExecutorService executor;
55 * @param poolSize overall maximum amount of threads
56 * @param keyPoolSize amount of threads per key
57 * @param runner runnable to start
59 public KeyBasedThreadpool(int poolSize, int keyPoolSize, GenericRunnableFactory<T, S> factory) {
60 this.queue = new ConcurrentLinkedQueue<>();
61 this.keyPoolSize = keyPoolSize;
62 this.factory = factory;
63 this.executor = Executors.newFixedThreadPool(poolSize);
64 this.runningKeys = Collections.synchronizedList(new ArrayList<T>());
65 LOG.info("starting key-based threadpool with keysize={} and size={}", keyPoolSize, poolSize);
68 public synchronized void execute(T key, S arg) {
69 if (this.isKeyPoolSizeReached(key)) {
70 LOG.debug("pool size for key {} reached. add to queue", key);
71 queue.add(new SimpleEntry<>(key, arg));
74 LOG.debug("starting executor for key {}.", key);
75 this.runningKeys.add(key);
76 this.executor.execute(new RunnableWrapper<T>(this.factory.create(key, arg), key, this));
81 private void executeNext() {
82 Entry<T, S> entry = this.queue.peek();
84 LOG.debug("executing next for key {} with arg {}", entry.getKey(), entry.getValue());
85 if (!this.isKeyPoolSizeReached(entry.getKey())) {
87 this.runningKeys.add(entry.getKey());
88 this.executor.execute(new RunnableWrapper<T>(this.factory.create(entry.getKey(), entry.getValue()),
89 entry.getKey(), this));
91 LOG.debug("key pool size reached. waiting for someone else to stop");
94 LOG.info("nothing to execute. queue is empty.");
98 private boolean isKeyPoolSizeReached(T key) {
99 LOG.trace("running keys size={}", this.runningKeys.size());
100 if (this.keyPoolSize == 1) {
101 return this.runningKeys.contains(key);
103 return this.runningKeys.stream().filter(e -> e == key).count() >= this.keyPoolSize;
107 public synchronized void onFinish(T key) {
108 LOG.debug("executor finished for key {}.", key);
109 this.runningKeys.remove(key);
113 public synchronized void join() {
114 LOG.debug("wait for all executors to finish");
115 while (this.runningKeys.size() > 0 && this.queue.size() > 0) {
118 } catch (InterruptedException e) {
119 Thread.currentThread().interrupt();
124 private static class RunnableWrapper<T> implements Runnable {
126 private final Runnable inner;
127 private final GenericRunnableFactoryCallback<T> callback;
130 public RunnableWrapper(Runnable inner, T key, GenericRunnableFactoryCallback<T> cb) {
139 this.callback.onFinish(this.key);