0c3014a0d6d51310e4719bba8b0f3b86bb13b230
[ccsdk/features.git] /
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.base.onfcore;
19
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24
25 /**
26  * Provide a thread that is receiving and process notifications.
27  * @param <T> represents the object that is provided with a notification and
28  * forwarded to the NotificationActor<T>.
29  *
30  * @author herbert
31  */
32
33 public class NotificationWorker<T> implements AutoCloseable {
34
35         private final BlockingQueue<T> workQueue;
36         private final ExecutorService service;
37         private final NotificationActor<T> actor;
38
39         public NotificationWorker(int numWorkers, int workQueueSize, NotificationActor<T> actorObject) {
40                 workQueue = new LinkedBlockingQueue<T>(workQueueSize);
41                 service = Executors.newFixedThreadPool(numWorkers);
42                 actor = actorObject;
43
44                 for (int i=0; i < numWorkers; i++) {
45                         service.submit(new Worker<T>(workQueue, actor));
46                 }
47         }
48
49         public void put(T item) {
50                 try {
51                         workQueue.put(item);
52                 } catch (InterruptedException ex) {
53                         Thread.currentThread().interrupt();
54                 }
55         }
56
57         @Override
58         public void close() throws Exception {
59                 // TODO Auto-generated method stub
60         }
61
62         private static class Worker<T> implements Runnable {
63         private final BlockingQueue<T> workQueue;
64                 private final NotificationActor<T> actor;
65
66
67         public Worker(BlockingQueue<T> workQueue, NotificationActor<T> actor) {
68             this.workQueue = workQueue;
69             this.actor = actor;
70         }
71
72                 @Override
73                 public void run() {
74                         while (!Thread.currentThread().isInterrupted()) {
75                                 try {
76                                         T item = workQueue.take();
77                                         actor.notificationActor(item);
78                                         // Process item
79                                 } catch (InterruptedException ex) {
80                                         Thread.currentThread().interrupt();
81                                         break;
82                                 }
83                         }
84                 }
85         }
86
87
88
89 }