2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.models.sim.dmaap.provider;
21 import static org.assertj.core.api.Assertions.assertThatCode;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertSame;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.ArgumentMatchers.anyInt;
28 import static org.mockito.ArgumentMatchers.anyLong;
29 import static org.mockito.ArgumentMatchers.eq;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.BlockingQueue;
39 import java.util.concurrent.LinkedBlockingQueue;
40 import java.util.concurrent.ScheduledExecutorService;
41 import java.util.concurrent.TimeUnit;
42 import javax.ws.rs.core.Response;
43 import javax.ws.rs.core.Response.Status;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.mockito.ArgumentCaptor;
49 import org.mockito.Captor;
50 import org.mockito.Mock;
51 import org.mockito.junit.MockitoJUnitRunner;
52 import org.onap.policy.common.utils.coder.CoderException;
53 import org.onap.policy.common.utils.coder.StandardCoder;
54 import org.onap.policy.common.utils.coder.StandardCoderObject;
55 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
57 @RunWith(MockitoJUnitRunner.class)
58 public class DmaapSimProviderTest {
59 private static final String EXPECTED_EXCEPTION = "expected exception";
60 private static final long SWEEP_SEC = 10L;
61 private static final String TOPIC1 = "topic-A";
62 private static final String TOPIC2 = "topic-B";
63 private static final String CONSUMER1 = "consumer-X";
64 private static final String CONSUMER_ID1 = "id1";
66 private MyProvider prov;
69 private DmaapSimParameterGroup params;
72 private ScheduledExecutorService timer;
75 private TopicData data1;
78 private TopicData data2;
81 private ArgumentCaptor<List<Object>> listCaptor;
84 private ArgumentCaptor<List<Object>> listCaptor2;
91 when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
93 prov = new MyProvider(params);
97 * Shuts down the provider, if it's running.
100 public void tearDown() {
101 if (prov.isAlive()) {
107 * Verifies that the constructor adds all of the expected actions to the service
111 public void testDmaapSimProvider() {
113 verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
116 verify(timer).shutdown();
120 public void testProcessDmaapMessagePut_List() throws CoderException {
121 prov = spy(new MyProvider(params));
123 when(data1.write(any())).thenReturn(2);
125 // force topics to exist
126 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
127 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
129 List<Object> lst = Arrays.asList("hello", "world");
130 Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
131 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
132 StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
133 assertEquals("2", sco.getString("count"));
135 List<Object> lst2 = Arrays.asList("helloB", "worldB");
136 prov.processDmaapMessagePut(TOPIC1, lst2);
137 prov.processDmaapMessagePut(TOPIC2, lst2);
139 // should only invoke this once for each topic
140 verify(prov).makeTopicData(TOPIC1);
141 verify(prov).makeTopicData(TOPIC2);
143 // should process all writes
144 verify(data1).write(lst);
145 verify(data1).write(lst2);
147 verify(data2).write(lst2);
151 public void testProcessDmaapMessagePut_Single() throws CoderException {
152 prov = spy(new MyProvider(params));
154 // force topics to exist
155 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
156 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
158 final String value1 = "abc";
159 Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
160 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
162 // ensure that the response can be decoded
163 new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
165 final String value2 = "def";
166 prov.processDmaapMessagePut(TOPIC1, value2);
167 prov.processDmaapMessagePut(TOPIC2, value2);
169 // should only invoke this once for each topic
170 verify(prov).makeTopicData(TOPIC1);
171 verify(prov).makeTopicData(TOPIC2);
173 // should process all writes as singleton lists
174 verify(data1, times(2)).write(listCaptor.capture());
175 assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
176 assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
178 verify(data2).write(listCaptor2.capture());
179 assertEquals(Collections.singletonList(value2), listCaptor2.getAllValues().get(0));
183 public void testProcessDmaapMessageGet() throws InterruptedException {
184 List<String> msgs = Arrays.asList("400", "500");
185 when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
187 Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
188 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
189 assertEquals(msgs.toString(), resp.getEntity().toString());
193 public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
194 when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
196 Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
197 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
198 assertEquals("[]", resp.getEntity().toString());
202 public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
203 BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
205 // put in a background thread so it doesn't interrupt the tester thread
208 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
209 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
210 } catch (InterruptedException e) {
211 Thread.currentThread().interrupt();
215 Response resp = respQueue.poll(3, TimeUnit.SECONDS);
218 assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
219 assertEquals("[]", resp.getEntity().toString());
223 public void testSweepTopicTaskRun() {
225 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
226 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
228 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
229 verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
231 captor.getValue().run();
232 verify(data1).removeIdleConsumers();
233 verify(data2).removeIdleConsumers();
236 captor.getValue().run();
237 verify(data1, times(2)).removeIdleConsumers();
238 verify(data2, times(2)).removeIdleConsumers();
242 public void testMakeTimerPool() {
243 // use a real provider so we can test the real makeTimer() method
244 DmaapSimProvider prov2 = new DmaapSimProvider(params);
246 assertThatCode(() -> prov2.stop()).doesNotThrowAnyException();
250 public void testMakeTopicData() {
251 // use a real provider so we can test the real makeTopicData() method
252 DmaapSimProvider prov2 = new DmaapSimProvider(params);
253 assertThatCode(() -> prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0))
254 .doesNotThrowAnyException();
258 public void testGetInstance_testSetInstance() {
259 DmaapSimProvider.setInstance(prov);
260 assertSame(prov, DmaapSimProvider.getInstance());
262 DmaapSimProvider.setInstance(null);
263 assertNull(DmaapSimProvider.getInstance());
267 public class MyProvider extends DmaapSimProvider {
269 public MyProvider(DmaapSimParameterGroup params) {
274 protected ScheduledExecutorService makeTimerPool() {
279 protected TopicData makeTopicData(String topicName) {
286 throw new IllegalArgumentException("unknown topic name: " + topicName);