2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
4 * Modifications Copyright (C) 2023 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.onap.policy.models.sim.dmaap.provider;
22 import static org.assertj.core.api.Assertions.assertThatCode;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertNull;
26 import static org.junit.Assert.assertSame;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.anyInt;
29 import static org.mockito.ArgumentMatchers.anyLong;
30 import static org.mockito.ArgumentMatchers.eq;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.times;
33 import static org.mockito.Mockito.verify;
34 import static org.mockito.Mockito.when;
36 import jakarta.ws.rs.core.Response;
37 import jakarta.ws.rs.core.Response.Status;
38 import java.util.Arrays;
39 import java.util.Collections;
40 import java.util.List;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.ScheduledExecutorService;
44 import java.util.concurrent.TimeUnit;
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.Captor;
51 import org.mockito.Mock;
52 import org.mockito.junit.MockitoJUnitRunner;
53 import org.onap.policy.common.utils.coder.CoderException;
54 import org.onap.policy.common.utils.coder.StandardCoder;
55 import org.onap.policy.common.utils.coder.StandardCoderObject;
56 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
58 @RunWith(MockitoJUnitRunner.class)
59 public class DmaapSimProviderTest {
60 private static final String EXPECTED_EXCEPTION = "expected exception";
61 private static final long SWEEP_SEC = 10L;
62 private static final String TOPIC1 = "topic-A";
63 private static final String TOPIC2 = "topic-B";
64 private static final String CONSUMER1 = "consumer-X";
65 private static final String CONSUMER_ID1 = "id1";
67 private MyProvider prov;
70 private DmaapSimParameterGroup params;
73 private ScheduledExecutorService timer;
76 private TopicData data1;
79 private TopicData data2;
82 private ArgumentCaptor<List<Object>> listCaptor;
85 private ArgumentCaptor<List<Object>> listCaptor2;
92 when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
94 prov = new MyProvider(params);
98 * Shuts down the provider, if it's running.
101 public void tearDown() {
102 if (prov.isAlive()) {
108 * Verifies that the constructor adds all the expected actions to the service
112 public void testDmaapSimProvider() {
114 verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
117 verify(timer).shutdown();
121 public void testProcessDmaapMessagePut_List() throws CoderException {
122 prov = spy(new MyProvider(params));
124 when(data1.write(any())).thenReturn(2);
126 // force topics to exist
127 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
128 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
130 List<Object> lst = Arrays.asList("hello", "world");
131 Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
132 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
133 StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
134 assertEquals("2", sco.getString("count"));
136 List<Object> lst2 = Arrays.asList("helloB", "worldB");
137 prov.processDmaapMessagePut(TOPIC1, lst2);
138 prov.processDmaapMessagePut(TOPIC2, lst2);
140 // should only invoke this once for each topic
141 verify(prov).makeTopicData(TOPIC1);
142 verify(prov).makeTopicData(TOPIC2);
144 // should process all writes
145 verify(data1).write(lst);
146 verify(data1).write(lst2);
148 verify(data2).write(lst2);
152 public void testProcessDmaapMessagePut_Single() throws CoderException {
153 prov = spy(new MyProvider(params));
155 // force topics to exist
156 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
157 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
159 final String value1 = "abc";
160 Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
161 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
163 // ensure that the response can be decoded
164 new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
166 final String value2 = "def";
167 prov.processDmaapMessagePut(TOPIC1, value2);
168 prov.processDmaapMessagePut(TOPIC2, value2);
170 // should only invoke this once for each topic
171 verify(prov).makeTopicData(TOPIC1);
172 verify(prov).makeTopicData(TOPIC2);
174 // should process all writes as singleton lists
175 verify(data1, times(2)).write(listCaptor.capture());
176 assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
177 assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
179 verify(data2).write(listCaptor2.capture());
180 assertEquals(Collections.singletonList(value2), listCaptor2.getAllValues().get(0));
184 public void testProcessDmaapMessageGet() throws InterruptedException {
185 List<String> msgs = Arrays.asList("400", "500");
186 when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
188 Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
189 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
190 assertEquals(msgs.toString(), resp.getEntity().toString());
194 public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
195 when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
197 Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
198 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
199 assertEquals("[]", resp.getEntity().toString());
203 public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
204 BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
206 // put in a background thread, so it doesn't interrupt the tester thread
209 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
210 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
211 } catch (InterruptedException e) {
212 Thread.currentThread().interrupt();
216 Response resp = respQueue.poll(3, TimeUnit.SECONDS);
219 assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
220 assertEquals("[]", resp.getEntity().toString());
224 public void testSweepTopicTaskRun() {
226 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
227 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
229 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
230 verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
232 captor.getValue().run();
233 verify(data1).removeIdleConsumers();
234 verify(data2).removeIdleConsumers();
237 captor.getValue().run();
238 verify(data1, times(2)).removeIdleConsumers();
239 verify(data2, times(2)).removeIdleConsumers();
243 public void testMakeTimerPool() {
244 // use a real provider, so we can test the real makeTimer() method
245 DmaapSimProvider prov2 = new DmaapSimProvider(params);
247 assertThatCode(prov2::stop).doesNotThrowAnyException();
251 public void testMakeTopicData() {
252 // use a real provider, so we can test the real makeTopicData() method
253 DmaapSimProvider prov2 = new DmaapSimProvider(params);
254 assertThatCode(() -> prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0))
255 .doesNotThrowAnyException();
259 public void testGetInstance_testSetInstance() {
260 DmaapSimProvider.setInstance(prov);
261 assertSame(prov, DmaapSimProvider.getInstance());
263 DmaapSimProvider.setInstance(null);
264 assertNull(DmaapSimProvider.getInstance());
268 public class MyProvider extends DmaapSimProvider {
270 public MyProvider(DmaapSimParameterGroup params) {
275 protected ScheduledExecutorService makeTimerPool() {
280 protected TopicData makeTopicData(String topicName) {
281 return switch (topicName) {
282 case TOPIC1 -> data1;
283 case TOPIC2 -> data2;
284 default -> throw new IllegalArgumentException("unknown topic name: " + topicName);