policy/models jdk 11 upgrades
[policy/models.git] / models-sim / models-sim-dmaap / src / test / java / org / onap / policy / models / sim / dmaap / provider / DmaapSimProviderTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.models.sim.dmaap.provider;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertSame;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyInt;
27 import static org.mockito.ArgumentMatchers.anyLong;
28 import static org.mockito.ArgumentMatchers.eq;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.TimeUnit;
41 import javax.ws.rs.core.Response;
42 import javax.ws.rs.core.Response.Status;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Ignore;
46 import org.junit.Test;
47 import org.mockito.ArgumentCaptor;
48 import org.mockito.Captor;
49 import org.mockito.Mock;
50 import org.mockito.MockitoAnnotations;
51 import org.onap.policy.common.utils.coder.CoderException;
52 import org.onap.policy.common.utils.coder.StandardCoder;
53 import org.onap.policy.common.utils.coder.StandardCoderObject;
54 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
55
56 public class DmaapSimProviderTest {
57     private static final String EXPECTED_EXCEPTION = "expected exception";
58     private static final long SWEEP_SEC = 10L;
59     private static final String TOPIC1 = "topic-A";
60     private static final String TOPIC2 = "topic-B";
61     private static final String CONSUMER1 = "consumer-X";
62     private static final String CONSUMER_ID1 = "id1";
63
64     private MyProvider prov;
65
66     @Mock
67     private DmaapSimParameterGroup params;
68
69     @Mock
70     private ScheduledExecutorService timer;
71
72     @Mock
73     private TopicData data1;
74
75     @Mock
76     private TopicData data2;
77
78     @Captor
79     private ArgumentCaptor<List<Object>> listCaptor;
80
81     /**
82      * Sets up.
83      */
84     @Before
85     public void setUp() {
86         MockitoAnnotations.initMocks(this);
87
88         when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
89
90         prov = new MyProvider(params);
91     }
92
93     /**
94      * Shuts down the provider, if it's running.
95      */
96     @After
97     public void tearDown() {
98         if (prov.isAlive()) {
99             prov.shutdown();
100         }
101     }
102
103     /**
104      * Verifies that the constructor adds all of the expected actions to the service
105      * manager container.
106      */
107     @Test
108     public void testDmaapSimProvider() {
109         prov.start();
110         verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
111
112         prov.stop();
113         verify(timer).shutdown();
114     }
115
116     @Test
117     public void testProcessDmaapMessagePut_List() throws CoderException {
118         prov = spy(new MyProvider(params));
119
120         when(data1.write(any())).thenReturn(2);
121
122         // force topics to exist
123         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
124         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
125
126         List<Object> lst = Arrays.asList("hello", "world");
127         Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
128         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
129         StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
130         assertEquals("2", sco.getString("count"));
131
132         List<Object> lst2 = Arrays.asList("helloB", "worldB");
133         prov.processDmaapMessagePut(TOPIC1, lst2);
134         prov.processDmaapMessagePut(TOPIC2, lst2);
135
136         // should only invoke this once for each topic
137         verify(prov).makeTopicData(TOPIC1);
138         verify(prov).makeTopicData(TOPIC2);
139
140         // should process all writes
141         verify(data1).write(lst);
142         verify(data1).write(lst2);
143
144         verify(data2).write(lst2);
145     }
146
147     @Test
148     @Ignore
149     public void testProcessDmaapMessagePut_Single() throws CoderException {
150         prov = spy(new MyProvider(params));
151
152         // force topics to exist
153         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
154         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
155
156         final String value1 = "abc";
157         Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
158         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
159
160         // ensure that the response can be decoded
161         new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
162
163         final String value2 = "def";
164         prov.processDmaapMessagePut(TOPIC1, value2);
165         prov.processDmaapMessagePut(TOPIC2, value2);
166
167         // should only invoke this once for each topic
168         verify(prov).makeTopicData(TOPIC1);
169         verify(prov).makeTopicData(TOPIC2);
170
171         // should process all writes as singleton lists
172         listCaptor.getAllValues().clear();
173         verify(data1, times(2)).write(listCaptor.capture());
174         assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
175         assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
176
177         listCaptor.getAllValues().clear();
178         verify(data2).write(listCaptor.capture());
179         assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(0));
180     }
181
182     @Test
183     public void testProcessDmaapMessageGet() throws InterruptedException {
184         List<String> msgs = Arrays.asList("400", "500");
185         when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
186
187         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
188         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
189         assertEquals(msgs.toString(), resp.getEntity().toString());
190     }
191
192     @Test
193     public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
194         when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
195
196         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
197         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
198         assertEquals("[]", resp.getEntity().toString());
199     }
200
201     @Test
202     public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
203         BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
204
205         // put in a background thread so it doesn't interrupt the tester thread
206         new Thread(() -> {
207             try {
208                 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
209                 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
210             } catch (InterruptedException e) {
211                 Thread.currentThread().interrupt();
212             }
213         }).start();
214
215         Response resp = respQueue.poll(3, TimeUnit.SECONDS);
216         assertNotNull(resp);
217
218         assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
219         assertEquals("[]", resp.getEntity().toString());
220     }
221
222     @Test
223     public void testSweepTopicTaskRun() {
224         prov.start();
225         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
226         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
227
228         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
229         verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
230
231         captor.getValue().run();
232         verify(data1).removeIdleConsumers();
233         verify(data2).removeIdleConsumers();
234
235         // run it again
236         captor.getValue().run();
237         verify(data1, times(2)).removeIdleConsumers();
238         verify(data2, times(2)).removeIdleConsumers();
239     }
240
241     @Test
242     public void testMakeTimerPool() {
243         // use a real provider so we can test the real makeTimer() method
244         DmaapSimProvider prov2 = new DmaapSimProvider(params);
245         prov2.start();
246         prov2.stop();
247     }
248
249     @Test
250     public void testMakeTopicData() {
251         // use a real provider so we can test the real makeTopicData() method
252         DmaapSimProvider prov2 = new DmaapSimProvider(params);
253         prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
254     }
255
256     @Test
257     public void testGetInstance_testSetInstance() {
258         DmaapSimProvider.setInstance(prov);
259         assertSame(prov, DmaapSimProvider.getInstance());
260
261         DmaapSimProvider.setInstance(null);
262         assertNull(DmaapSimProvider.getInstance());
263     }
264
265
266     public class MyProvider extends DmaapSimProvider {
267
268         public MyProvider(DmaapSimParameterGroup params) {
269             super(params);
270         }
271
272         @Override
273         protected ScheduledExecutorService makeTimerPool() {
274             return timer;
275         }
276
277         @Override
278         protected TopicData makeTopicData(String topicName) {
279             switch (topicName) {
280                 case TOPIC1:
281                     return data1;
282                 case TOPIC2:
283                     return data2;
284                 default:
285                     throw new IllegalArgumentException("unknown topic name: " + topicName);
286             }
287         }
288     }
289 }