Fix timeout bug in dmaap simulator
[policy/models.git] / models-sim / models-sim-dmaap / src / test / java / org / onap / policy / models / sim / dmaap / provider / DmaapSimProviderTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.models.sim.dmaap.provider;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertSame;
25 import static org.mockito.Matchers.any;
26 import static org.mockito.Matchers.anyInt;
27 import static org.mockito.Matchers.anyLong;
28 import static org.mockito.Matchers.eq;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.TimeUnit;
41 import javax.ws.rs.core.Response;
42 import javax.ws.rs.core.Response.Status;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.mockito.Captor;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.common.utils.coder.StandardCoderObject;
53 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
54
55 public class DmaapSimProviderTest {
56     private static final String EXPECTED_EXCEPTION = "expected exception";
57     private static final long SWEEP_SEC = 10L;
58     private static final String TOPIC1 = "topic-A";
59     private static final String TOPIC2 = "topic-B";
60     private static final String CONSUMER1 = "consumer-X";
61     private static final String CONSUMER_ID1 = "id1";
62
63     private MyProvider prov;
64
65     @Mock
66     private DmaapSimParameterGroup params;
67
68     @Mock
69     private ScheduledExecutorService timer;
70
71     @Mock
72     private TopicData data1;
73
74     @Mock
75     private TopicData data2;
76
77     @Captor
78     private ArgumentCaptor<List<Object>> listCaptor;
79
80     /**
81      * Sets up.
82      */
83     @Before
84     public void setUp() {
85         MockitoAnnotations.initMocks(this);
86
87         when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
88
89         prov = new MyProvider(params);
90     }
91
92     /**
93      * Shuts down the provider, if it's running.
94      */
95     @After
96     public void tearDown() {
97         if (prov.isAlive()) {
98             prov.shutdown();
99         }
100     }
101
102     /**
103      * Verifies that the constructor adds all of the expected actions to the service
104      * manager container.
105      */
106     @Test
107     public void testDmaapSimProvider() {
108         prov.start();
109         verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
110
111         prov.stop();
112         verify(timer).shutdown();
113     }
114
115     @Test
116     public void testProcessDmaapMessagePut_List() throws CoderException {
117         prov = spy(new MyProvider(params));
118
119         when(data1.write(any())).thenReturn(2);
120
121         // force topics to exist
122         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
123         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
124
125         List<Object> lst = Arrays.asList("hello", "world");
126         Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
127         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
128         StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
129         assertEquals("2", sco.getString("count"));
130
131         List<Object> lst2 = Arrays.asList("helloB", "worldB");
132         prov.processDmaapMessagePut(TOPIC1, lst2);
133         prov.processDmaapMessagePut(TOPIC2, lst2);
134
135         // should only invoke this once for each topic
136         verify(prov).makeTopicData(TOPIC1);
137         verify(prov).makeTopicData(TOPIC2);
138
139         // should process all writes
140         verify(data1).write(lst);
141         verify(data1).write(lst2);
142
143         verify(data2).write(lst2);
144     }
145
146     @Test
147     public void testProcessDmaapMessagePut_Single() throws CoderException {
148         prov = spy(new MyProvider(params));
149
150         // force topics to exist
151         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
152         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
153
154         final String value1 = "abc";
155         Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
156         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
157
158         // ensure that the response can be decoded
159         new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
160
161         final String value2 = "def";
162         prov.processDmaapMessagePut(TOPIC1, value2);
163         prov.processDmaapMessagePut(TOPIC2, value2);
164
165         // should only invoke this once for each topic
166         verify(prov).makeTopicData(TOPIC1);
167         verify(prov).makeTopicData(TOPIC2);
168
169         // should process all writes as singleton lists
170         listCaptor.getAllValues().clear();
171         verify(data1, times(2)).write(listCaptor.capture());
172         assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
173         assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
174
175         listCaptor.getAllValues().clear();
176         verify(data2).write(listCaptor.capture());
177         assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(0));
178     }
179
180     @Test
181     public void testProcessDmaapMessageGet() throws InterruptedException {
182         List<String> msgs = Arrays.asList("400", "500");
183         when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
184
185         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
186         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
187         assertEquals(msgs.toString(), resp.getEntity().toString());
188     }
189
190     @Test
191     public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
192         when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
193
194         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
195         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
196         assertEquals("[]", resp.getEntity().toString());
197     }
198
199     @Test
200     public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
201         BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
202
203         // put in a background thread so it doesn't interrupt the tester thread
204         new Thread(() -> {
205             try {
206                 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
207                 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
208             } catch (InterruptedException e) {
209                 Thread.currentThread().interrupt();
210             }
211         }).start();
212
213         Response resp = respQueue.poll(3, TimeUnit.SECONDS);
214         assertNotNull(resp);
215
216         assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
217         assertEquals("[]", resp.getEntity().toString());
218     }
219
220     @Test
221     public void testSweepTopicTaskRun() {
222         prov.start();
223         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
224         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
225
226         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
227         verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
228
229         captor.getValue().run();
230         verify(data1).removeIdleConsumers();
231         verify(data2).removeIdleConsumers();
232
233         // run it again
234         captor.getValue().run();
235         verify(data1, times(2)).removeIdleConsumers();
236         verify(data2, times(2)).removeIdleConsumers();
237     }
238
239     @Test
240     public void testMakeTimerPool() {
241         // use a real provider so we can test the real makeTimer() method
242         DmaapSimProvider prov2 = new DmaapSimProvider(params);
243         prov2.start();
244         prov2.stop();
245     }
246
247     @Test
248     public void testMakeTopicData() {
249         // use a real provider so we can test the real makeTopicData() method
250         DmaapSimProvider prov2 = new DmaapSimProvider(params);
251         prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
252     }
253
254     @Test
255     public void testGetInstance_testSetInstance() {
256         DmaapSimProvider.setInstance(prov);
257         assertSame(prov, DmaapSimProvider.getInstance());
258
259         DmaapSimProvider.setInstance(null);
260         assertNull(DmaapSimProvider.getInstance());
261     }
262
263
264     public class MyProvider extends DmaapSimProvider {
265
266         public MyProvider(DmaapSimParameterGroup params) {
267             super(params);
268         }
269
270         @Override
271         protected ScheduledExecutorService makeTimerPool() {
272             return timer;
273         }
274
275         @Override
276         protected TopicData makeTopicData(String topicName) {
277             switch (topicName) {
278                 case TOPIC1:
279                     return data1;
280                 case TOPIC2:
281                     return data2;
282                 default:
283                     throw new IllegalArgumentException("unknown topic name: " + topicName);
284             }
285         }
286     }
287 }