re base code
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / distribution / TestQueue.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.distribution;
22
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import org.openecomp.sdc.be.components.distribution.engine.CambriaHandler;
25 import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
26 import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl;
27
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Timer;
31 import java.util.TimerTask;
32 import java.util.concurrent.*;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 public class TestQueue {
36
37     public static void main(String[] args) {
38         ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
39         threadFactoryBuilder.setNameFormat("distribution-notification-thread");
40         ThreadFactory threadFactory = threadFactoryBuilder.build();
41         // TODO: add the package of google to the pom
42
43         ExecutorService executorService = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory);
44         // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L,
45         // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
46
47         // 2 threads are always up and they handle the tasks. in case core size
48         // is 0, only one is handles the tasks.
49         // ExecutorService executorService = new ThreadPoolExecutor(0, 2, 60L,
50         // TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));
51
52         // TODO : check what happen when the number of threads are full. Throw
53         // RejectedExecutionException
54         // TODO : check what happen whether the pool is full and the size of
55         // pool
56
57         ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(threadFactory);
58         Runnable task = new Runnable() {
59
60             @Override
61             public void run() {
62                 try {
63                     System.out.println("iN SLEEP" + Thread.currentThread());
64                     Thread.sleep(10 * 1000);
65                     System.out.println("OUT SLEEP");
66                 } catch (InterruptedException e) {
67                     e.printStackTrace();
68                 }
69             }
70         };
71
72         for (int i = 0; i < 4; i++) {
73             try {
74                 executorService.submit(task);
75             } catch (RejectedExecutionException e) {
76                 e.printStackTrace();
77             }
78         }
79
80         newCachedThreadPool.submit(task);
81         System.out.println("After submitting the task");
82
83         MyWorker[] watchThreads = new MyWorker[1];
84         BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
85         for (int i = 0; i < watchThreads.length; i++) {
86             MyWorker myWorker = new MyWorker(queue);
87             myWorker.start();
88         }
89
90         for (int i = 0; i < 1; i++) {
91             try {
92                 queue.put("message " + i);
93             } catch (InterruptedException e) {
94                 e.printStackTrace();
95             }
96         }
97
98     }
99
100     public static class MyTimerTask extends TimerTask {
101
102         AtomicBoolean state;
103         Thread thread;
104
105         public MyTimerTask(AtomicBoolean state, Thread thread) {
106             super();
107             this.state = state;
108             this.thread = thread;
109
110             System.out.println("After create timer");
111         }
112
113         @Override
114         public void run() {
115             System.out.println("In running of Timer task");
116             if (!state.get()) {
117                 System.out.println("In running of Timer task. Going to interrupt thread");
118                 // thread.interrupt();
119             } else {
120                 System.out.println("In running of Timer task. Finished.");
121             }
122         }
123
124     }
125
126     public static class MyWorker extends Thread {
127
128         boolean active = true;
129         private final BlockingQueue<String> queue;
130
131         public MyWorker(BlockingQueue<String> queue) {
132             this.queue = queue;
133         }
134
135         Timer timer = new Timer();
136
137         public void run() {
138             try {
139                 while (active) {
140                     String s = queue.take();
141                     System.out.println("Thread " + Thread.currentThread() + " fecthed a message " + s);
142
143                     AtomicBoolean atomicBoolean = new AtomicBoolean(false);
144                     MyTimerTask myTimerTask = new MyTimerTask(atomicBoolean, this);
145                     timer.schedule(myTimerTask, 10 * 1000);
146                     doWork(s);
147                     atomicBoolean.set(true);
148
149                 }
150             } catch (InterruptedException ie) {
151
152                 System.out.println("Interrupted our thread");
153                 ie.printStackTrace();
154             }
155         }
156
157         private void doWork(String s) {
158             // TODO Auto-generated method stub
159
160             CambriaHandler cambriaHandler = new CambriaHandler();
161             INotificationData data = new NotificationDataImpl();
162             List<String> servers = new ArrayList<>();
163             servers.add("aaaaaaa");
164             cambriaHandler.sendNotification("topicName", "uebPublicKey", "uebSecretKey", servers, data);
165
166             System.out.println("IN WORK " + s);
167             try {
168                 Thread.sleep(1 * 1000);
169             } catch (InterruptedException e) {
170
171                 for (int i = 0; i < 10; i++) {
172                     System.out.println("*************************************************");
173                 }
174                 e.printStackTrace();
175             }
176         }
177     }
178
179 }