f41a9038f24c17604aa7b1bb1bd2edf7618b1722
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP : ccsdk features
4  * ================================================================================
5  * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
6  * All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  */
22 package org.onap.ccsdk.features.sdnr.wt.common.threading;
23
24 import java.util.AbstractMap.SimpleEntry;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32
33 /**
34  * Threadpool for running n instances per key T
35  *
36  * @author jack
37  *
38  * @param <T>
39  * @param <S>
40  */
41 public class KeyBasedThreadpool<T, S> implements GenericRunnableFactoryCallback<T> {
42
43     private final Queue<Entry<T, S>> queue;
44     private final List<T> runningKeys;
45     private final int keyPoolSize;
46     private final GenericRunnableFactory<T, S> factory;
47     private final ExecutorService executor;
48
49     /**
50      *
51      * @param poolSize overall maximum amount of threads
52      * @param keyPoolSize amount of threads per key
53      * @param runner runnable to start
54      */
55     public KeyBasedThreadpool(int poolSize, int keyPoolSize, GenericRunnableFactory<T, S> factory) {
56         this.queue = new ConcurrentLinkedQueue<>();
57         this.keyPoolSize = keyPoolSize;
58         this.factory = factory;
59         this.executor = Executors.newFixedThreadPool(poolSize);
60         this.runningKeys = new ArrayList<>();
61     }
62
63     public void execute(T key, S arg) {
64         if (this.isKeyPoolSizeReached(key)) {
65             queue.add(new SimpleEntry<>(key, arg));
66         } else {
67             this.runningKeys.add(key);
68             this.executor.execute(this.factory.create(arg, this));
69         }
70
71     }
72
73     private void executeNext() {
74         Entry<T, S> entry = this.queue.peek();
75         if (!this.isKeyPoolSizeReached(entry.getKey())) {
76             this.queue.poll();
77             this.runningKeys.add(entry.getKey());
78             this.executor.execute(this.factory.create(entry.getValue(), this));
79         }
80     }
81
82     private boolean isKeyPoolSizeReached(T key) {
83         return this.runningKeys.stream().filter(e -> e == key).count() >= this.keyPoolSize;
84     }
85
86     @Override
87     public void onFinish(T key) {
88         this.runningKeys.remove(key);
89         this.executeNext();
90     }
91
92
93 }