2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.distribution;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import org.openecomp.sdc.be.components.distribution.engine.CambriaHandler;
25 import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
26 import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Timer;
31 import java.util.TimerTask;
32 import java.util.concurrent.*;
33 import java.util.concurrent.atomic.AtomicBoolean;
35 public class TestQueue {
37 public static void main(String[] args) {
38 ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
39 threadFactoryBuilder.setNameFormat("distribution-notification-thread");
40 ThreadFactory threadFactory = threadFactoryBuilder.build();
41 // TODO: add the package of google to the pom
43 ExecutorService executorService = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory);
44 // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L,
45 // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
47 // 2 threads are always up and they handle the tasks. in case core size
48 // is 0, only one is handles the tasks.
49 // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L,
50 // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
52 // TODO : check what happen when the number of threads are full. Throw
53 // RejectedExecutionException
54 // TODO : check what happen whether the pool is full and the size of
57 ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(threadFactory);
58 Runnable task = new Runnable() {
63 System.out.println("iN SLEEP" + Thread.currentThread());
64 Thread.sleep(10 * 1000);
65 System.out.println("OUT SLEEP");
66 } catch (InterruptedException e) {
72 for (int i = 0; i < 4; i++) {
74 executorService.submit(task);
75 } catch (RejectedExecutionException e) {
80 newCachedThreadPool.submit(task);
81 System.out.println("After submitting the task");
83 MyWorker[] watchThreads = new MyWorker[1];
84 BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
85 for (int i = 0; i < watchThreads.length; i++) {
86 MyWorker myWorker = new MyWorker(queue);
90 for (int i = 0; i < 1; i++) {
92 queue.put("message " + i);
93 } catch (InterruptedException e) {
100 public static class MyTimerTask extends TimerTask {
105 public MyTimerTask(AtomicBoolean state, Thread thread) {
108 this.thread = thread;
110 System.out.println("After create timer");
115 System.out.println("In running of Timer task");
117 System.out.println("In running of Timer task. Going to interrupt thread");
118 // thread.interrupt();
120 System.out.println("In running of Timer task. Finished.");
126 public static class MyWorker extends Thread {
128 boolean active = true;
129 private final BlockingQueue<String> queue;
131 public MyWorker(BlockingQueue<String> queue) {
135 Timer timer = new Timer();
140 String s = queue.take();
141 System.out.println("Thread " + Thread.currentThread() + " fecthed a message " + s);
143 AtomicBoolean atomicBoolean = new AtomicBoolean(false);
144 MyTimerTask myTimerTask = new MyTimerTask(atomicBoolean, this);
145 timer.schedule(myTimerTask, 10 * 1000);
147 atomicBoolean.set(true);
150 } catch (InterruptedException ie) {
152 System.out.println("Interrupted our thread");
153 ie.printStackTrace();
157 private void doWork(String s) {
158 // TODO Auto-generated method stub
160 CambriaHandler cambriaHandler = new CambriaHandler();
161 INotificationData data = new NotificationDataImpl();
162 List<String> servers = new ArrayList<>();
163 servers.add("aaaaaaa");
164 cambriaHandler.sendNotification("topicName", "uebPublicKey", "uebSecretKey", servers, data);
166 System.out.println("IN WORK " + s);
168 Thread.sleep(1 * 1000);
169 } catch (InterruptedException e) {
171 for (int i = 0; i < 10; i++) {
172 System.out.println("*************************************************");