2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.distribution;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import org.openecomp.sdc.be.components.distribution.engine.CambriaHandler;
25 import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
26 import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Timer;
31 import java.util.TimerTask;
32 import java.util.concurrent.ArrayBlockingQueue;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.SynchronousQueue;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
43 public class TestQueue {
45 public static void main(String[] args) {
46 ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
47 threadFactoryBuilder.setNameFormat("distribution-notification-thread");
48 ThreadFactory threadFactory = threadFactoryBuilder.build();
49 // TODO: add the package of google to the pom
51 ExecutorService executorService = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
52 // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L,
53 // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
55 // 2 threads are always up and they handle the tasks. in case core size
56 // is 0, only one is handles the tasks.
57 // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L,
58 // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
60 // TODO : check what happen when the number of threads are full. Throw
61 // RejectedExecutionException
62 // TODO : check what happen whether the pool is full and the size of
65 ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(threadFactory);
66 Runnable task = new Runnable() {
71 System.out.println("iN SLEEP" + Thread.currentThread());
72 Thread.sleep(10 * 1000);
73 System.out.println("OUT SLEEP");
74 } catch (InterruptedException e) {
80 for (int i = 0; i < 4; i++) {
82 executorService.submit(task);
83 } catch (RejectedExecutionException e) {
88 newCachedThreadPool.submit(task);
89 System.out.println("After submitting the task");
91 MyWorker[] watchThreads = new MyWorker[1];
92 BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
93 for (int i = 0; i < watchThreads.length; i++) {
94 MyWorker myWorker = new MyWorker(queue);
98 for (int i = 0; i < 1; i++) {
100 queue.put("message " + i);
101 } catch (InterruptedException e) {
108 public static class MyTimerTask extends TimerTask {
113 public MyTimerTask(AtomicBoolean state, Thread thread) {
116 this.thread = thread;
118 System.out.println("After create timer");
123 System.out.println("In running of Timer task");
124 if (state.get() == false) {
125 System.out.println("In running of Timer task. Going to interrupt thread");
126 // thread.interrupt();
128 System.out.println("In running of Timer task. Finished.");
134 public static class MyWorker extends Thread {
136 boolean active = true;
137 private final BlockingQueue<String> queue;
139 public MyWorker(BlockingQueue<String> queue) {
143 Timer timer = new Timer();
148 String s = queue.take();
149 System.out.println("Thread " + Thread.currentThread() + " fecthed a message " + s);
151 AtomicBoolean atomicBoolean = new AtomicBoolean(false);
152 MyTimerTask myTimerTask = new MyTimerTask(atomicBoolean, this);
153 timer.schedule(myTimerTask, 10 * 1000);
155 atomicBoolean.set(true);
158 } catch (InterruptedException ie) {
160 System.out.println("Interrupted our thread");
161 ie.printStackTrace();
165 private void doWork(String s) {
166 // TODO Auto-generated method stub
168 CambriaHandler cambriaHandler = new CambriaHandler();
169 INotificationData data = new NotificationDataImpl();
170 List<String> servers = new ArrayList<>();
171 servers.add("aaaaaaa");
172 cambriaHandler.sendNotification("topicName", "uebPublicKey", "uebSecretKey", servers, data);
174 System.out.println("IN WORK " + s);
176 Thread.sleep(1 * 1000);
177 } catch (InterruptedException e) {
179 for (int i = 0; i < 10; i++) {
180 System.out.println("*************************************************");