2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.models.sim.dmaap.provider;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertSame;
25 import static org.mockito.Matchers.any;
26 import static org.mockito.Matchers.anyInt;
27 import static org.mockito.Matchers.anyLong;
28 import static org.mockito.Matchers.eq;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.TimeUnit;
41 import javax.ws.rs.core.Response;
42 import javax.ws.rs.core.Response.Status;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.mockito.Captor;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.common.utils.coder.StandardCoderObject;
53 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
55 public class DmaapSimProviderTest {
56 private static final String EXPECTED_EXCEPTION = "expected exception";
57 private static final long SWEEP_SEC = 10L;
58 private static final String TOPIC1 = "topic-A";
59 private static final String TOPIC2 = "topic-B";
60 private static final String CONSUMER1 = "consumer-X";
61 private static final String CONSUMER_ID1 = "id1";
63 private MyProvider prov;
66 private DmaapSimParameterGroup params;
69 private ScheduledExecutorService timer;
72 private TopicData data1;
75 private TopicData data2;
78 private ArgumentCaptor<List<Object>> listCaptor;
85 MockitoAnnotations.initMocks(this);
87 when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
89 prov = new MyProvider(params);
93 * Shuts down the provider, if it's running.
96 public void tearDown() {
103 * Verifies that the constructor adds all of the expected actions to the service
107 public void testDmaapSimProvider() {
109 verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
112 verify(timer).shutdown();
116 public void testProcessDmaapMessagePut_List() throws CoderException {
117 prov = spy(new MyProvider(params));
119 when(data1.write(any())).thenReturn(2);
121 // force topics to exist
122 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
123 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
125 List<Object> lst = Arrays.asList("hello", "world");
126 Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
127 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
128 StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
129 assertEquals("2", sco.getString("count"));
131 List<Object> lst2 = Arrays.asList("helloB", "worldB");
132 prov.processDmaapMessagePut(TOPIC1, lst2);
133 prov.processDmaapMessagePut(TOPIC2, lst2);
135 // should only invoke this once for each topic
136 verify(prov).makeTopicData(TOPIC1);
137 verify(prov).makeTopicData(TOPIC2);
139 // should process all writes
140 verify(data1).write(lst);
141 verify(data1).write(lst2);
143 verify(data2).write(lst2);
147 public void testProcessDmaapMessagePut_Single() throws CoderException {
148 prov = spy(new MyProvider(params));
150 // force topics to exist
151 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
152 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
154 final String value1 = "abc";
155 Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
156 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
158 // ensure that the response can be decoded
159 new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
161 final String value2 = "def";
162 prov.processDmaapMessagePut(TOPIC1, value2);
163 prov.processDmaapMessagePut(TOPIC2, value2);
165 // should only invoke this once for each topic
166 verify(prov).makeTopicData(TOPIC1);
167 verify(prov).makeTopicData(TOPIC2);
169 // should process all writes as singleton lists
170 listCaptor.getAllValues().clear();
171 verify(data1, times(2)).write(listCaptor.capture());
172 assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
173 assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
175 listCaptor.getAllValues().clear();
176 verify(data2).write(listCaptor.capture());
177 assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(0));
181 public void testProcessDmaapMessageGet() throws InterruptedException {
182 List<String> msgs = Arrays.asList("400", "500");
183 when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
185 Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
186 assertEquals(Status.OK.getStatusCode(), resp.getStatus());
187 assertEquals(msgs.toString(), resp.getEntity().toString());
191 public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
192 when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
194 Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
195 assertEquals(Status.REQUEST_TIMEOUT.getStatusCode(), resp.getStatus());
196 assertEquals("[]", resp.getEntity().toString());
200 public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
201 BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
203 // put in a background thread so it doesn't interrupt the tester thread
206 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
207 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
208 } catch (InterruptedException e) {
209 Thread.currentThread().interrupt();
213 Response resp = respQueue.poll(3, TimeUnit.SECONDS);
216 assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
217 assertEquals("[]", resp.getEntity().toString());
221 public void testSweepTopicTaskRun() {
223 prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
224 prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
226 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
227 verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
229 captor.getValue().run();
230 verify(data1).removeIdleConsumers();
231 verify(data2).removeIdleConsumers();
234 captor.getValue().run();
235 verify(data1, times(2)).removeIdleConsumers();
236 verify(data2, times(2)).removeIdleConsumers();
240 public void testMakeTimerPool() {
241 // use a real provider so we can test the real makeTimer() method
242 DmaapSimProvider prov2 = new DmaapSimProvider(params);
248 public void testMakeTopicData() {
249 // use a real provider so we can test the real makeTopicData() method
250 DmaapSimProvider prov2 = new DmaapSimProvider(params);
251 prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
255 public void testGetInstance_testSetInstance() {
256 DmaapSimProvider.setInstance(prov);
257 assertSame(prov, DmaapSimProvider.getInstance());
259 DmaapSimProvider.setInstance(null);
260 assertNull(DmaapSimProvider.getInstance());
264 public class MyProvider extends DmaapSimProvider {
266 public MyProvider(DmaapSimParameterGroup params) {
271 protected ScheduledExecutorService makeTimerPool() {
276 protected TopicData makeTopicData(String topicName) {
283 throw new IllegalArgumentException("unknown topic name: " + topicName);