2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.models.sim.dmaap.provider;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertSame;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyInt;
27 import static org.mockito.ArgumentMatchers.anyLong;
28 import static org.mockito.ArgumentMatchers.eq;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.TimeUnit;
41 import javax.ws.rs.core.Response;
42 import javax.ws.rs.core.Response.Status;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Ignore;
46 import org.junit.Test;
47 import org.mockito.ArgumentCaptor;
48 import org.mockito.Captor;
49 import org.mockito.Mock;
50 import org.mockito.MockitoAnnotations;
51 import org.onap.policy.common.utils.coder.CoderException;
52 import org.onap.policy.common.utils.coder.StandardCoder;
53 import org.onap.policy.common.utils.coder.StandardCoderObject;
54 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
56 public class DmaapSimProviderTest {
57 private static final String EXPECTED_EXCEPTION = "expected exception";
58 private static final long SWEEP_SEC = 10L;
59 private static final String TOPIC1 = "topic-A";
60 private static final String TOPIC2 = "topic-B";
61 private static final String CONSUMER1 = "consumer-X";
62 private static final String CONSUMER_ID1 = "id1";
64 private MyProvider prov;
67 private DmaapSimParameterGroup params;
70 private ScheduledExecutorService timer;
73 private TopicData data1;
76 private TopicData data2;
79 private ArgumentCaptor<List<Object>> listCaptor;
86 MockitoAnnotations.initMocks(this);
88 when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
90 prov = new MyProvider(params);
94 * Shuts down the provider, if it's running.
97 public void tearDown() {
104 * Verifies that the constructor adds all of the expected actions to the service
108 public void testDmaapSimProvider() {
110 verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
113 verify(timer).shutdown();
117 public void testProcessDmaapMessagePut_List() throws CoderException {
118 prov = spy(new MyProvider(params));
120 when(data1.write(any())).thenReturn(2);
122 // force topics to exist
123 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
124 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
126 List<Object> lst = Arrays.asList("hello", "world");
127 Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
128 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
129 StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
130 assertEquals("2", sco.getString("count"));
132 List<Object> lst2 = Arrays.asList("helloB", "worldB");
133 prov.processDmaapMessagePut(TOPIC1, lst2);
134 prov.processDmaapMessagePut(TOPIC2, lst2);
136 // should only invoke this once for each topic
137 verify(prov).makeTopicData(TOPIC1);
138 verify(prov).makeTopicData(TOPIC2);
140 // should process all writes
141 verify(data1).write(lst);
142 verify(data1).write(lst2);
144 verify(data2).write(lst2);
149 public void testProcessDmaapMessagePut_Single() throws CoderException {
150 prov = spy(new MyProvider(params));
152 // force topics to exist
153 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
154 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
156 final String value1 = "abc";
157 Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
158 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
160 // ensure that the response can be decoded
161 new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
163 final String value2 = "def";
164 prov.processDmaapMessagePut(TOPIC1, value2);
165 prov.processDmaapMessagePut(TOPIC2, value2);
167 // should only invoke this once for each topic
168 verify(prov).makeTopicData(TOPIC1);
169 verify(prov).makeTopicData(TOPIC2);
171 // should process all writes as singleton lists
172 listCaptor.getAllValues().clear();
173 verify(data1, times(2)).write(listCaptor.capture());
174 assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
175 assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
177 listCaptor.getAllValues().clear();
178 verify(data2).write(listCaptor.capture());
179 assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(0));
183 public void testProcessDmaapMessageGet() throws InterruptedException {
184 List<String> msgs = Arrays.asList("400", "500");
185 when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
187 Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
188 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
189 assertEquals(msgs.toString(), resp.getEntity().toString());
193 public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
194 when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
196 Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
197 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
198 assertEquals("[]", resp.getEntity().toString());
202 public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
203 BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
205 // put in a background thread so it doesn't interrupt the tester thread
208 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
209 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
210 } catch (InterruptedException e) {
211 Thread.currentThread().interrupt();
215 Response resp = respQueue.poll(3, TimeUnit.SECONDS);
218 assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
219 assertEquals("[]", resp.getEntity().toString());
223 public void testSweepTopicTaskRun() {
225 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
226 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
228 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
229 verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
231 captor.getValue().run();
232 verify(data1).removeIdleConsumers();
233 verify(data2).removeIdleConsumers();
236 captor.getValue().run();
237 verify(data1, times(2)).removeIdleConsumers();
238 verify(data2, times(2)).removeIdleConsumers();
242 public void testMakeTimerPool() {
243 // use a real provider so we can test the real makeTimer() method
244 DmaapSimProvider prov2 = new DmaapSimProvider(params);
250 public void testMakeTopicData() {
251 // use a real provider so we can test the real makeTopicData() method
252 DmaapSimProvider prov2 = new DmaapSimProvider(params);
253 prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
257 public void testGetInstance_testSetInstance() {
258 DmaapSimProvider.setInstance(prov);
259 assertSame(prov, DmaapSimProvider.getInstance());
261 DmaapSimProvider.setInstance(null);
262 assertNull(DmaapSimProvider.getInstance());
266 public class MyProvider extends DmaapSimProvider {
268 public MyProvider(DmaapSimParameterGroup params) {
273 protected ScheduledExecutorService makeTimerPool() {
278 protected TopicData makeTopicData(String topicName) {
285 throw new IllegalArgumentException("unknown topic name: " + topicName);