Sync Integ to Master
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / distribution / TestQueue.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.distribution;
22
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import org.openecomp.sdc.be.components.distribution.engine.CambriaHandler;
25 import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
26 import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl;
27
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Timer;
31 import java.util.TimerTask;
32 import java.util.concurrent.ArrayBlockingQueue;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.SynchronousQueue;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42
43 public class TestQueue {
44
45     public static void main(String[] args) {
46         ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
47         threadFactoryBuilder.setNameFormat("distribution-notification-thread");
48         ThreadFactory threadFactory = threadFactoryBuilder.build();
49         // TODO: add the package of google to the pom
50
51         ExecutorService executorService = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
52         // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L,
53         // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
54
55         // 2 threads are always up and they handle the tasks. in case core size
56         // is 0, only one is handles the tasks.
57         // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L,
58         // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
59
60         // TODO : check what happen when the number of threads are full. Throw
61         // RejectedExecutionException
62         // TODO : check what happen whether the pool is full and the size of
63         // pool
64
65         ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(threadFactory);
66         Runnable task = new Runnable() {
67
68             @Override
69             public void run() {
70                 try {
71                     System.out.println("iN SLEEP" + Thread.currentThread());
72                     Thread.sleep(10 * 1000);
73                     System.out.println("OUT SLEEP");
74                 } catch (InterruptedException e) {
75                     e.printStackTrace();
76                 }
77             }
78         };
79
80         for (int i = 0; i < 4; i++) {
81             try {
82                 executorService.submit(task);
83             } catch (RejectedExecutionException e) {
84                 e.printStackTrace();
85             }
86         }
87
88         newCachedThreadPool.submit(task);
89         System.out.println("After submitting the task");
90
91         MyWorker[] watchThreads = new MyWorker[1];
92         BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
93         for (int i = 0; i < watchThreads.length; i++) {
94             MyWorker myWorker = new MyWorker(queue);
95             myWorker.start();
96         }
97
98         for (int i = 0; i < 1; i++) {
99             try {
100                 queue.put("message " + i);
101             } catch (InterruptedException e) {
102                 e.printStackTrace();
103             }
104         }
105
106     }
107
108     public static class MyTimerTask extends TimerTask {
109
110         AtomicBoolean state;
111         Thread thread;
112
113         public MyTimerTask(AtomicBoolean state, Thread thread) {
114             super();
115             this.state = state;
116             this.thread = thread;
117
118             System.out.println("After create timer");
119         }
120
121         @Override
122         public void run() {
123             System.out.println("In running of Timer task");
124             if (state.get() == false) {
125                 System.out.println("In running of Timer task. Going to interrupt thread");
126                 // thread.interrupt();
127             } else {
128                 System.out.println("In running of Timer task. Finished.");
129             }
130         }
131
132     }
133
134     public static class MyWorker extends Thread {
135
136         boolean active = true;
137         private final BlockingQueue<String> queue;
138
139         public MyWorker(BlockingQueue<String> queue) {
140             this.queue = queue;
141         }
142
143         Timer timer = new Timer();
144
145         public void run() {
146             try {
147                 while (active) {
148                     String s = queue.take();
149                     System.out.println("Thread " + Thread.currentThread() + " fecthed a message " + s);
150
151                     AtomicBoolean atomicBoolean = new AtomicBoolean(false);
152                     MyTimerTask myTimerTask = new MyTimerTask(atomicBoolean, this);
153                     timer.schedule(myTimerTask, 10 * 1000);
154                     doWork(s);
155                     atomicBoolean.set(true);
156
157                 }
158             } catch (InterruptedException ie) {
159
160                 System.out.println("Interrupted our thread");
161                 ie.printStackTrace();
162             }
163         }
164
165         private void doWork(String s) {
166             // TODO Auto-generated method stub
167
168             CambriaHandler cambriaHandler = new CambriaHandler();
169             INotificationData data = new NotificationDataImpl();
170             List<String> servers = new ArrayList<>();
171             servers.add("aaaaaaa");
172             cambriaHandler.sendNotification("topicName", "uebPublicKey", "uebSecretKey", servers, data);
173
174             System.out.println("IN WORK " + s);
175             try {
176                 Thread.sleep(1 * 1000);
177             } catch (InterruptedException e) {
178
179                 for (int i = 0; i < 10; i++) {
180                     System.out.println("*************************************************");
181                 }
182                 e.printStackTrace();
183             }
184         }
185     }
186
187 }