Remove deprecated methods from models
[policy/models.git] / models-sim / models-sim-dmaap / src / test / java / org / onap / policy / models / sim / dmaap / provider / DmaapSimProviderTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.models.sim.dmaap.provider;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertSame;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyInt;
27 import static org.mockito.ArgumentMatchers.anyLong;
28 import static org.mockito.ArgumentMatchers.eq;
29 import static org.mockito.Mockito.spy;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.TimeUnit;
41 import javax.ws.rs.core.Response;
42 import javax.ws.rs.core.Response.Status;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.mockito.Captor;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.common.utils.coder.StandardCoderObject;
53 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
54
55 public class DmaapSimProviderTest {
56     private static final String EXPECTED_EXCEPTION = "expected exception";
57     private static final long SWEEP_SEC = 10L;
58     private static final String TOPIC1 = "topic-A";
59     private static final String TOPIC2 = "topic-B";
60     private static final String CONSUMER1 = "consumer-X";
61     private static final String CONSUMER_ID1 = "id1";
62
63     private MyProvider prov;
64
65     @Mock
66     private DmaapSimParameterGroup params;
67
68     @Mock
69     private ScheduledExecutorService timer;
70
71     @Mock
72     private TopicData data1;
73
74     @Mock
75     private TopicData data2;
76
77     @Captor
78     private ArgumentCaptor<List<Object>> listCaptor;
79
80     @Captor
81     private ArgumentCaptor<List<Object>> listCaptor2;
82
83     /**
84      * Sets up.
85      */
86     @Before
87     public void setUp() {
88         MockitoAnnotations.initMocks(this);
89
90         when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
91
92         prov = new MyProvider(params);
93     }
94
95     /**
96      * Shuts down the provider, if it's running.
97      */
98     @After
99     public void tearDown() {
100         if (prov.isAlive()) {
101             prov.shutdown();
102         }
103     }
104
105     /**
106      * Verifies that the constructor adds all of the expected actions to the service
107      * manager container.
108      */
109     @Test
110     public void testDmaapSimProvider() {
111         prov.start();
112         verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
113
114         prov.stop();
115         verify(timer).shutdown();
116     }
117
118     @Test
119     public void testProcessDmaapMessagePut_List() throws CoderException {
120         prov = spy(new MyProvider(params));
121
122         when(data1.write(any())).thenReturn(2);
123
124         // force topics to exist
125         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
126         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
127
128         List<Object> lst = Arrays.asList("hello", "world");
129         Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
130         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
131         StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
132         assertEquals("2", sco.getString("count"));
133
134         List<Object> lst2 = Arrays.asList("helloB", "worldB");
135         prov.processDmaapMessagePut(TOPIC1, lst2);
136         prov.processDmaapMessagePut(TOPIC2, lst2);
137
138         // should only invoke this once for each topic
139         verify(prov).makeTopicData(TOPIC1);
140         verify(prov).makeTopicData(TOPIC2);
141
142         // should process all writes
143         verify(data1).write(lst);
144         verify(data1).write(lst2);
145
146         verify(data2).write(lst2);
147     }
148
149     @Test
150     public void testProcessDmaapMessagePut_Single() throws CoderException {
151         prov = spy(new MyProvider(params));
152
153         // force topics to exist
154         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
155         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
156
157         final String value1 = "abc";
158         Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
159         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
160
161         // ensure that the response can be decoded
162         new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
163
164         final String value2 = "def";
165         prov.processDmaapMessagePut(TOPIC1, value2);
166         prov.processDmaapMessagePut(TOPIC2, value2);
167
168         // should only invoke this once for each topic
169         verify(prov).makeTopicData(TOPIC1);
170         verify(prov).makeTopicData(TOPIC2);
171
172         // should process all writes as singleton lists
173         verify(data1, times(2)).write(listCaptor.capture());
174         assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
175         assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
176
177         verify(data2).write(listCaptor2.capture());
178         assertEquals(Collections.singletonList(value2), listCaptor2.getAllValues().get(0));
179     }
180
181     @Test
182     public void testProcessDmaapMessageGet() throws InterruptedException {
183         List<String> msgs = Arrays.asList("400", "500");
184         when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
185
186         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
187         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
188         assertEquals(msgs.toString(), resp.getEntity().toString());
189     }
190
191     @Test
192     public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
193         when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
194
195         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
196         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
197         assertEquals("[]", resp.getEntity().toString());
198     }
199
200     @Test
201     public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
202         BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
203
204         // put in a background thread so it doesn't interrupt the tester thread
205         new Thread(() -> {
206             try {
207                 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
208                 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
209             } catch (InterruptedException e) {
210                 Thread.currentThread().interrupt();
211             }
212         }).start();
213
214         Response resp = respQueue.poll(3, TimeUnit.SECONDS);
215         assertNotNull(resp);
216
217         assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
218         assertEquals("[]", resp.getEntity().toString());
219     }
220
221     @Test
222     public void testSweepTopicTaskRun() {
223         prov.start();
224         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
225         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
226
227         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
228         verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
229
230         captor.getValue().run();
231         verify(data1).removeIdleConsumers();
232         verify(data2).removeIdleConsumers();
233
234         // run it again
235         captor.getValue().run();
236         verify(data1, times(2)).removeIdleConsumers();
237         verify(data2, times(2)).removeIdleConsumers();
238     }
239
240     @Test
241     public void testMakeTimerPool() {
242         // use a real provider so we can test the real makeTimer() method
243         DmaapSimProvider prov2 = new DmaapSimProvider(params);
244         prov2.start();
245         prov2.stop();
246     }
247
248     @Test
249     public void testMakeTopicData() {
250         // use a real provider so we can test the real makeTopicData() method
251         DmaapSimProvider prov2 = new DmaapSimProvider(params);
252         prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
253     }
254
255     @Test
256     public void testGetInstance_testSetInstance() {
257         DmaapSimProvider.setInstance(prov);
258         assertSame(prov, DmaapSimProvider.getInstance());
259
260         DmaapSimProvider.setInstance(null);
261         assertNull(DmaapSimProvider.getInstance());
262     }
263
264
265     public class MyProvider extends DmaapSimProvider {
266
267         public MyProvider(DmaapSimParameterGroup params) {
268             super(params);
269         }
270
271         @Override
272         protected ScheduledExecutorService makeTimerPool() {
273             return timer;
274         }
275
276         @Override
277         protected TopicData makeTopicData(String topicName) {
278             switch (topicName) {
279                 case TOPIC1:
280                     return data1;
281                 case TOPIC2:
282                     return data2;
283                 default:
284                     throw new IllegalArgumentException("unknown topic name: " + topicName);
285             }
286         }
287     }
288 }