4513ffb825d8409686183bce3ef14822867b7d0d
[policy/models.git] / models-sim / models-sim-dmaap / src / test / java / org / onap / policy / models / sim / dmaap / provider / ConsumerGroupDataTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP Policy Models
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.models.sim.dmaap.provider;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertSame;
26 import static org.junit.Assert.assertTrue;
27
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37
38 public class ConsumerGroupDataTest {
39     private static final int WAIT_MS = 5000;
40     private static final int MIN_WAIT_MS = WAIT_MS / 2;
41     private static final String MY_TOPIC = "my-topic";
42     private static final String MY_CONSUMER = "my-consumer";
43     private static final String MSG1 = "hello";
44     private static final String MSG2 = "there";
45     private static final String MSG3 = "world";
46     private static final int MAX_THREADS = 30;
47
48     private MyData data;
49     private MyReader thread;
50     private List<MyReader> threads;
51
52     /**
53      * Sets up.
54      */
55     @Before
56     public void setUp() {
57         data = new MyData();
58         thread = null;
59         threads = new ArrayList<>(MAX_THREADS);
60     }
61
62     /**
63      * Stops any running thread.
64      */
65     @After
66     public void tearDown() {
67         for (MyReader thr : threads) {
68             thr.interrupt();
69         }
70
71         for (MyReader thr : threads) {
72             thr.await();
73         }
74     }
75
76     @Test
77     public void testShouldRemove() throws InterruptedException {
78         assertFalse(data.shouldRemove());
79         assertTrue(data.shouldRemove());
80
81         data = new MyData();
82
83         // start a reader thread and wait for it to poll its queue
84         startReader(0, 10);
85         assertTrue(data.await());
86
87         assertFalse(data.shouldRemove());
88     }
89
90     @Test
91     public void testRead() {
92         data.enqueue(MSG1, MSG2, MSG3, MSG1, MSG2, MSG3);
93
94         // this reader only wants one
95         startReader(1, 1);
96         assertTrue(thread.await());
97         assertEquals("[hello]", thread.result.toString());
98
99         // this reader wants three
100         startReader(3, 1);
101         assertTrue(thread.await());
102         assertEquals("[there, world, hello]", thread.result.toString());
103
104         // this reader wants three, but will only get two
105         startReader(3, 1);
106         assertTrue(thread.await());
107         assertEquals("[there, world]", thread.result.toString());
108     }
109
110     @Test
111     public void testRead_Idle() throws InterruptedException {
112         // force it to idle
113         data.shouldRemove();
114         data.shouldRemove();
115
116         long tbeg = System.currentTimeMillis();
117         assertSame(ConsumerGroupData.UNREADABLE_LIST, data.read(1, WAIT_MS));
118
119         // should not have waited
120         assertTrue(System.currentTimeMillis() < tbeg + MIN_WAIT_MS);
121     }
122
123     @Test
124     public void testRead_NegativeCount() throws InterruptedException {
125         data.enqueue(MSG1, MSG2);
126         startReader(-1, 3);
127         assertTrue(data.await());
128
129         // wait time should be unaffected
130         assertEquals(3L, data.waitMs2);
131
132         assertTrue(thread.await());
133
134         // should only return one message
135         assertEquals("[hello]", thread.result.toString());
136     }
137
138     @Test
139     public void testRead_NegativeWait() throws InterruptedException {
140         data.enqueue(MSG1, MSG2, MSG3);
141         startReader(2, -3);
142         assertTrue(data.await());
143
144         assertEquals(0L, data.waitMs2);
145
146         assertTrue(thread.await());
147
148         // should return two messages, as requested
149         assertEquals("[hello, there]", thread.result.toString());
150     }
151
152     @Test
153     public void testRead_NoMessages() throws InterruptedException {
154         startReader(0, 0);
155         assertTrue(data.await());
156
157         assertTrue(thread.await());
158         assertTrue(thread.result.isEmpty());
159     }
160
161     @Test
162     public void testRead_MultiThreaded() {
163         // queue up a bunch of messages
164         final int expected = MAX_THREADS * 3;
165         for (int x = 0; x < expected; ++x) {
166             data.enqueue(MSG1);
167         }
168
169         for (int x = 0; x < MAX_THREADS; ++x) {
170             startReader(4, 1);
171         }
172
173         int actual = 0;
174         for (MyReader thr : threads) {
175             thr.await();
176             actual += thr.result.size();
177         }
178
179         assertEquals(expected, actual);
180     }
181
182
183     /**
184      * Starts a reader thread.
185      *
186      * @param limit number of messages to read at one time
187      * @param waitMs wait time, in milliseconds
188      */
189     private void startReader(int limit, long waitMs) {
190         thread = new MyReader(limit, waitMs);
191
192         thread.setDaemon(true);
193         thread.start();
194
195         threads.add(thread);
196     }
197
198
199     private class MyData extends ConsumerGroupData {
200
201         /**
202          * Decremented when {@link #getNextMessage(long)} is invoked.
203          */
204         private final CountDownLatch latch = new CountDownLatch(1);
205
206         /**
207          * Messages to be added to the queue when {@link #getNextMessage(long)} is
208          * invoked.
209          */
210         private final List<String> messages = new ArrayList<>();
211
212         /**
213          * Value passed to {@link #getNextMessage(long)}.
214          */
215         private volatile long waitMs2 = -1;
216
217         /**
218          * Constructs the object.
219          */
220         public MyData() {
221             super(MY_TOPIC, MY_CONSUMER);
222         }
223
224         /**
225          * Arranges for messages to be injected into the queue the next time
226          * {@link #getNextMessage(long)} is invoked.
227          *
228          * @param messages the messages to be injected
229          */
230         public void enqueue(String... messages) {
231             this.messages.addAll(Arrays.asList(messages));
232         }
233
234         @Override
235         protected String getNextMessage(long waitMs) throws InterruptedException {
236             waitMs2 = waitMs;
237
238             latch.countDown();
239
240             synchronized (messages) {
241                 write(messages);
242                 messages.clear();
243             }
244
245             return super.getNextMessage(waitMs);
246         }
247
248         /**
249          * Waits for {@link #getNextMessage(long)} to be invoked.
250          *
251          * @return {@code true} if {@link #getNextMessage(long)} was invoked,
252          *         {@code false} if the timer expired first
253          * @throws InterruptedException if the current thread is interrupted while waiting
254          */
255         public boolean await() throws InterruptedException {
256             return latch.await(WAIT_MS, TimeUnit.MILLISECONDS);
257         }
258     }
259
260     /**
261      * Thread that will invoke the consumer group's read() method one time.
262      */
263     private class MyReader extends Thread {
264         private final ConsumerGroupData group = data;
265         private final int limit;
266         private final long waitMs;
267
268         /**
269          * Result returned by the read() method.
270          */
271         private List<String> result = Collections.emptyList();
272
273         public MyReader(int limit, long waitMs) {
274             this.limit = limit;
275             this.waitMs = waitMs;
276         }
277
278         @Override
279         public void run() {
280             try {
281                 result = group.read(limit, waitMs);
282
283             } catch (InterruptedException e) {
284                 Thread.currentThread().interrupt();
285             }
286         }
287
288         /**
289          * Waits for the thread to complete.
290          *
291          * @return {@code true} if the thread completed, {@code false} if the thread is
292          *         still running
293          */
294         public boolean await() {
295             try {
296                 this.join(WAIT_MS);
297
298             } catch (InterruptedException e) {
299                 Thread.currentThread().interrupt();
300             }
301
302             return !this.isAlive();
303         }
304     }
305 }