2 * ============LICENSE_START=======================================================
3 * ONAP : ccsdk features
4 * ================================================================================
5 * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.features.sdnr.wt.common.threading;
24 import java.util.AbstractMap.SimpleEntry;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
34 * Threadpool for running n instances per key T
41 public class KeyBasedThreadpool<T, S> implements GenericRunnableFactoryCallback<T> {
43 private final Queue<Entry<T, S>> queue;
44 private final List<T> runningKeys;
45 private final int keyPoolSize;
46 private final GenericRunnableFactory<T, S> factory;
47 private final ExecutorService executor;
51 * @param poolSize overall maximum amount of threads
52 * @param keyPoolSize amount of threads per key
53 * @param runner runnable to start
55 public KeyBasedThreadpool(int poolSize, int keyPoolSize, GenericRunnableFactory<T, S> factory) {
56 this.queue = new ConcurrentLinkedQueue<>();
57 this.keyPoolSize = keyPoolSize;
58 this.factory = factory;
59 this.executor = Executors.newFixedThreadPool(poolSize);
60 this.runningKeys = new ArrayList<>();
63 public void execute(T key, S arg) {
64 if (this.isKeyPoolSizeReached(key)) {
65 queue.add(new SimpleEntry<>(key, arg));
67 this.runningKeys.add(key);
68 this.executor.execute(this.factory.create(arg, this));
73 private void executeNext() {
74 Entry<T, S> entry = this.queue.peek();
75 if (!this.isKeyPoolSizeReached(entry.getKey())) {
77 this.runningKeys.add(entry.getKey());
78 this.executor.execute(this.factory.create(entry.getValue(), this));
82 private boolean isKeyPoolSizeReached(T key) {
83 return this.runningKeys.stream().filter(e -> e == key).count() >= this.keyPoolSize;
87 public void onFinish(T key) {
88 this.runningKeys.remove(key);