2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.models.sim.dmaap.provider;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertSame;
26 import static org.junit.Assert.assertTrue;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
38 public class ConsumerGroupDataTest {
39 private static final int WAIT_MS = 5000;
40 private static final int MIN_WAIT_MS = WAIT_MS / 2;
41 private static final String MY_TOPIC = "my-topic";
42 private static final String MY_CONSUMER = "my-consumer";
43 private static final String MSG1 = "hello";
44 private static final String MSG2 = "there";
45 private static final String MSG3 = "world";
46 private static final int MAX_THREADS = 30;
49 private MyReader thread;
50 private List<MyReader> threads;
59 threads = new ArrayList<>(MAX_THREADS);
63 * Stops any running thread.
66 public void tearDown() {
67 for (MyReader thr : threads) {
71 for (MyReader thr : threads) {
77 public void testShouldRemove() throws InterruptedException {
78 assertFalse(data.shouldRemove());
79 assertTrue(data.shouldRemove());
83 // start a reader thread and wait for it to poll its queue
85 assertTrue(data.await());
87 assertFalse(data.shouldRemove());
91 public void testRead() {
92 data.enqueue(MSG1, MSG2, MSG3, MSG1, MSG2, MSG3);
94 // this reader only wants one
96 assertTrue(thread.await());
97 assertEquals("[hello]", thread.result.toString());
99 // this reader wants three
101 assertTrue(thread.await());
102 assertEquals("[there, world, hello]", thread.result.toString());
104 // this reader wants three, but will only get two
106 assertTrue(thread.await());
107 assertEquals("[there, world]", thread.result.toString());
111 public void testRead_Idle() throws InterruptedException {
116 long tbeg = System.currentTimeMillis();
117 assertSame(ConsumerGroupData.UNREADABLE_LIST, data.read(1, WAIT_MS));
119 // should not have waited
120 assertTrue(System.currentTimeMillis() < tbeg + MIN_WAIT_MS);
124 public void testRead_NegativeCount() throws InterruptedException {
125 data.enqueue(MSG1, MSG2);
127 assertTrue(data.await());
129 // wait time should be unaffected
130 assertEquals(3L, data.waitMs2);
132 assertTrue(thread.await());
134 // should only return one message
135 assertEquals("[hello]", thread.result.toString());
139 public void testRead_NegativeWait() throws InterruptedException {
140 data.enqueue(MSG1, MSG2, MSG3);
142 assertTrue(data.await());
144 assertEquals(0L, data.waitMs2);
146 assertTrue(thread.await());
148 // should return two messages, as requested
149 assertEquals("[hello, there]", thread.result.toString());
153 public void testRead_NoMessages() throws InterruptedException {
155 assertTrue(data.await());
157 assertTrue(thread.await());
158 assertTrue(thread.result.isEmpty());
162 public void testRead_MultiThreaded() {
163 // queue up a bunch of messages
164 final int expected = MAX_THREADS * 3;
165 for (int x = 0; x < expected; ++x) {
169 for (int x = 0; x < MAX_THREADS; ++x) {
174 for (MyReader thr : threads) {
176 actual += thr.result.size();
179 assertEquals(expected, actual);
184 * Starts a reader thread.
186 * @param limit number of messages to read at one time
187 * @param waitMs wait time, in milliseconds
189 private void startReader(int limit, long waitMs) {
190 thread = new MyReader(limit, waitMs);
192 thread.setDaemon(true);
199 private class MyData extends ConsumerGroupData {
202 * Decremented when {@link #getNextMessage(long)} is invoked.
204 private final CountDownLatch latch = new CountDownLatch(1);
207 * Messages to be added to the queue when {@link #getNextMessage(long)} is
210 private final List<String> messages = new ArrayList<>();
213 * Value passed to {@link #getNextMessage(long)}.
215 private volatile long waitMs2 = -1;
218 * Constructs the object.
221 super(MY_TOPIC, MY_CONSUMER);
225 * Arranges for messages to be injected into the queue the next time
226 * {@link #getNextMessage(long)} is invoked.
228 * @param messages the messages to be injected
230 public void enqueue(String... messages) {
231 this.messages.addAll(Arrays.asList(messages));
235 protected String getNextMessage(long waitMs) throws InterruptedException {
240 synchronized (messages) {
245 return super.getNextMessage(waitMs);
249 * Waits for {@link #getNextMessage(long)} to be invoked.
251 * @return {@code true} if {@link #getNextMessage(long)} was invoked,
252 * {@code false} if the timer expired first
253 * @throws InterruptedException if the current thread is interrupted while waiting
255 public boolean await() throws InterruptedException {
256 return latch.await(WAIT_MS, TimeUnit.MILLISECONDS);
261 * Thread that will invoke the consumer group's read() method one time.
263 private class MyReader extends Thread {
264 private final ConsumerGroupData group = data;
265 private final int limit;
266 private final long waitMs;
269 * Result returned by the read() method.
271 private List<String> result = Collections.emptyList();
273 public MyReader(int limit, long waitMs) {
275 this.waitMs = waitMs;
281 result = group.read(limit, waitMs);
283 } catch (InterruptedException e) {
284 Thread.currentThread().interrupt();
289 * Waits for the thread to complete.
291 * @return {@code true} if the thread completed, {@code false} if the thread is
294 public boolean await() {
298 } catch (InterruptedException e) {
299 Thread.currentThread().interrupt();
302 return !this.isAlive();