b6e0ca23cac6aebc996bd121ac08db0fbd4a6164
[ccsdk/features.git] /
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  * 
10  * http://www.apache.org/licenses/LICENSE-2.0
11  * 
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf;
19
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24
25 public class NotificationWorker<T> implements AutoCloseable {
26
27         private final BlockingQueue<T> workQueue;
28         private final ExecutorService service;
29         private final NotificationActor<T> actor;
30
31         public NotificationWorker(int numWorkers, int workQueueSize, NotificationActor<T> actorObject) {
32                 workQueue = new LinkedBlockingQueue<T>(workQueueSize);
33                 service = Executors.newFixedThreadPool(numWorkers);
34                 actor = actorObject;
35
36                 for (int i=0; i < numWorkers; i++) {
37                         service.submit(new Worker<T>(workQueue, actor));
38                 }
39         }
40
41         public void put(T item) {
42                 try {
43                         workQueue.put(item);
44                 } catch (InterruptedException ex) {
45                         Thread.currentThread().interrupt();
46                 }
47         }
48
49         @Override
50         public void close() throws Exception {
51                 // TODO Auto-generated method stub
52         }
53
54         private static class Worker<T> implements Runnable {
55         private final BlockingQueue<T> workQueue;
56                 private final NotificationActor<T> actor;
57
58
59         public Worker(BlockingQueue<T> workQueue, NotificationActor<T> actor) {
60             this.workQueue = workQueue;
61             this.actor = actor;
62         }
63
64                 @Override
65                 public void run() {
66                         while (!Thread.currentThread().isInterrupted()) {
67                                 try {
68                                         T item = workQueue.take();
69                                         actor.notificationActor(item);
70                                         // Process item
71                                 } catch (InterruptedException ex) {
72                                         Thread.currentThread().interrupt();
73                                         break;
74                                 }
75                         }
76                 }
77         }
78
79
80
81 }