2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.models.sim.dmaap.provider;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertSame;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyInt;
27 import static org.mockito.ArgumentMatchers.anyLong;
28 import static org.mockito.ArgumentMatchers.eq;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.TimeUnit;
41 import javax.ws.rs.core.Response;
42 import javax.ws.rs.core.Response.Status;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.mockito.Captor;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.common.utils.coder.StandardCoderObject;
53 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
55 public class DmaapSimProviderTest {
56 private static final String EXPECTED_EXCEPTION = "expected exception";
57 private static final long SWEEP_SEC = 10L;
58 private static final String TOPIC1 = "topic-A";
59 private static final String TOPIC2 = "topic-B";
60 private static final String CONSUMER1 = "consumer-X";
61 private static final String CONSUMER_ID1 = "id1";
63 private MyProvider prov;
66 private DmaapSimParameterGroup params;
69 private ScheduledExecutorService timer;
72 private TopicData data1;
75 private TopicData data2;
78 private ArgumentCaptor<List<Object>> listCaptor;
81 private ArgumentCaptor<List<Object>> listCaptor2;
88 MockitoAnnotations.initMocks(this);
90 when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
92 prov = new MyProvider(params);
96 * Shuts down the provider, if it's running.
99 public void tearDown() {
100 if (prov.isAlive()) {
106 * Verifies that the constructor adds all of the expected actions to the service
110 public void testDmaapSimProvider() {
112 verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
115 verify(timer).shutdown();
119 public void testProcessDmaapMessagePut_List() throws CoderException {
120 prov = spy(new MyProvider(params));
122 when(data1.write(any())).thenReturn(2);
124 // force topics to exist
125 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
126 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
128 List<Object> lst = Arrays.asList("hello", "world");
129 Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
130 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
131 StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
132 assertEquals("2", sco.getString("count"));
134 List<Object> lst2 = Arrays.asList("helloB", "worldB");
135 prov.processDmaapMessagePut(TOPIC1, lst2);
136 prov.processDmaapMessagePut(TOPIC2, lst2);
138 // should only invoke this once for each topic
139 verify(prov).makeTopicData(TOPIC1);
140 verify(prov).makeTopicData(TOPIC2);
142 // should process all writes
143 verify(data1).write(lst);
144 verify(data1).write(lst2);
146 verify(data2).write(lst2);
150 public void testProcessDmaapMessagePut_Single() throws CoderException {
151 prov = spy(new MyProvider(params));
153 // force topics to exist
154 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
155 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
157 final String value1 = "abc";
158 Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
159 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
161 // ensure that the response can be decoded
162 new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
164 final String value2 = "def";
165 prov.processDmaapMessagePut(TOPIC1, value2);
166 prov.processDmaapMessagePut(TOPIC2, value2);
168 // should only invoke this once for each topic
169 verify(prov).makeTopicData(TOPIC1);
170 verify(prov).makeTopicData(TOPIC2);
172 // should process all writes as singleton lists
173 verify(data1, times(2)).write(listCaptor.capture());
174 assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
175 assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
177 verify(data2).write(listCaptor2.capture());
178 assertEquals(Collections.singletonList(value2), listCaptor2.getAllValues().get(0));
182 public void testProcessDmaapMessageGet() throws InterruptedException {
183 List<String> msgs = Arrays.asList("400", "500");
184 when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
186 Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
187 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
188 assertEquals(msgs.toString(), resp.getEntity().toString());
192 public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
193 when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
195 Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
196 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
197 assertEquals("[]", resp.getEntity().toString());
201 public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
202 BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
204 // put in a background thread so it doesn't interrupt the tester thread
207 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
208 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
209 } catch (InterruptedException e) {
210 Thread.currentThread().interrupt();
214 Response resp = respQueue.poll(3, TimeUnit.SECONDS);
217 assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
218 assertEquals("[]", resp.getEntity().toString());
222 public void testSweepTopicTaskRun() {
224 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
225 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
227 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
228 verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
230 captor.getValue().run();
231 verify(data1).removeIdleConsumers();
232 verify(data2).removeIdleConsumers();
235 captor.getValue().run();
236 verify(data1, times(2)).removeIdleConsumers();
237 verify(data2, times(2)).removeIdleConsumers();
241 public void testMakeTimerPool() {
242 // use a real provider so we can test the real makeTimer() method
243 DmaapSimProvider prov2 = new DmaapSimProvider(params);
249 public void testMakeTopicData() {
250 // use a real provider so we can test the real makeTopicData() method
251 DmaapSimProvider prov2 = new DmaapSimProvider(params);
252 prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
256 public void testGetInstance_testSetInstance() {
257 DmaapSimProvider.setInstance(prov);
258 assertSame(prov, DmaapSimProvider.getInstance());
260 DmaapSimProvider.setInstance(null);
261 assertNull(DmaapSimProvider.getInstance());
265 public class MyProvider extends DmaapSimProvider {
267 public MyProvider(DmaapSimParameterGroup params) {
272 protected ScheduledExecutorService makeTimerPool() {
277 protected TopicData makeTopicData(String topicName) {
284 throw new IllegalArgumentException("unknown topic name: " + topicName);