3d72f2e4ccbeef2128fa33f04ab134cd394f581c
[policy/models.git] / models-sim / models-sim-dmaap / src / test / java / org / onap / policy / models / sim / dmaap / provider / DmaapSimProviderTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.models.sim.dmaap.provider;
20
21 import static org.assertj.core.api.Assertions.assertThatCode;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertSame;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.ArgumentMatchers.anyInt;
28 import static org.mockito.ArgumentMatchers.anyLong;
29 import static org.mockito.ArgumentMatchers.eq;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.BlockingQueue;
39 import java.util.concurrent.LinkedBlockingQueue;
40 import java.util.concurrent.ScheduledExecutorService;
41 import java.util.concurrent.TimeUnit;
42 import javax.ws.rs.core.Response;
43 import javax.ws.rs.core.Response.Status;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.mockito.ArgumentCaptor;
49 import org.mockito.Captor;
50 import org.mockito.Mock;
51 import org.mockito.junit.MockitoJUnitRunner;
52 import org.onap.policy.common.utils.coder.CoderException;
53 import org.onap.policy.common.utils.coder.StandardCoder;
54 import org.onap.policy.common.utils.coder.StandardCoderObject;
55 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
56
57 @RunWith(MockitoJUnitRunner.class)
58 public class DmaapSimProviderTest {
59     private static final String EXPECTED_EXCEPTION = "expected exception";
60     private static final long SWEEP_SEC = 10L;
61     private static final String TOPIC1 = "topic-A";
62     private static final String TOPIC2 = "topic-B";
63     private static final String CONSUMER1 = "consumer-X";
64     private static final String CONSUMER_ID1 = "id1";
65
66     private MyProvider prov;
67
68     @Mock
69     private DmaapSimParameterGroup params;
70
71     @Mock
72     private ScheduledExecutorService timer;
73
74     @Mock
75     private TopicData data1;
76
77     @Mock
78     private TopicData data2;
79
80     @Captor
81     private ArgumentCaptor<List<Object>> listCaptor;
82
83     @Captor
84     private ArgumentCaptor<List<Object>> listCaptor2;
85
86     /**
87      * Sets up.
88      */
89     @Before
90     public void setUp() {
91         when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
92
93         prov = new MyProvider(params);
94     }
95
96     /**
97      * Shuts down the provider, if it's running.
98      */
99     @After
100     public void tearDown() {
101         if (prov.isAlive()) {
102             prov.shutdown();
103         }
104     }
105
106     /**
107      * Verifies that the constructor adds all of the expected actions to the service
108      * manager container.
109      */
110     @Test
111     public void testDmaapSimProvider() {
112         prov.start();
113         verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
114
115         prov.stop();
116         verify(timer).shutdown();
117     }
118
119     @Test
120     public void testProcessDmaapMessagePut_List() throws CoderException {
121         prov = spy(new MyProvider(params));
122
123         when(data1.write(any())).thenReturn(2);
124
125         // force topics to exist
126         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
127         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
128
129         List<Object> lst = Arrays.asList("hello", "world");
130         Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
131         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
132         StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
133         assertEquals("2", sco.getString("count"));
134
135         List<Object> lst2 = Arrays.asList("helloB", "worldB");
136         prov.processDmaapMessagePut(TOPIC1, lst2);
137         prov.processDmaapMessagePut(TOPIC2, lst2);
138
139         // should only invoke this once for each topic
140         verify(prov).makeTopicData(TOPIC1);
141         verify(prov).makeTopicData(TOPIC2);
142
143         // should process all writes
144         verify(data1).write(lst);
145         verify(data1).write(lst2);
146
147         verify(data2).write(lst2);
148     }
149
150     @Test
151     public void testProcessDmaapMessagePut_Single() throws CoderException {
152         prov = spy(new MyProvider(params));
153
154         // force topics to exist
155         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
156         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
157
158         final String value1 = "abc";
159         Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
160         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
161
162         // ensure that the response can be decoded
163         new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
164
165         final String value2 = "def";
166         prov.processDmaapMessagePut(TOPIC1, value2);
167         prov.processDmaapMessagePut(TOPIC2, value2);
168
169         // should only invoke this once for each topic
170         verify(prov).makeTopicData(TOPIC1);
171         verify(prov).makeTopicData(TOPIC2);
172
173         // should process all writes as singleton lists
174         verify(data1, times(2)).write(listCaptor.capture());
175         assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
176         assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
177
178         verify(data2).write(listCaptor2.capture());
179         assertEquals(Collections.singletonList(value2), listCaptor2.getAllValues().get(0));
180     }
181
182     @Test
183     public void testProcessDmaapMessageGet() throws InterruptedException {
184         List<String> msgs = Arrays.asList("400", "500");
185         when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
186
187         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
188         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
189         assertEquals(msgs.toString(), resp.getEntity().toString());
190     }
191
192     @Test
193     public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
194         when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
195
196         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
197         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
198         assertEquals("[]", resp.getEntity().toString());
199     }
200
201     @Test
202     public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
203         BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
204
205         // put in a background thread so it doesn't interrupt the tester thread
206         new Thread(() -> {
207             try {
208                 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
209                 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
210             } catch (InterruptedException e) {
211                 Thread.currentThread().interrupt();
212             }
213         }).start();
214
215         Response resp = respQueue.poll(3, TimeUnit.SECONDS);
216         assertNotNull(resp);
217
218         assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
219         assertEquals("[]", resp.getEntity().toString());
220     }
221
222     @Test
223     public void testSweepTopicTaskRun() {
224         prov.start();
225         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
226         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
227
228         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
229         verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
230
231         captor.getValue().run();
232         verify(data1).removeIdleConsumers();
233         verify(data2).removeIdleConsumers();
234
235         // run it again
236         captor.getValue().run();
237         verify(data1, times(2)).removeIdleConsumers();
238         verify(data2, times(2)).removeIdleConsumers();
239     }
240
241     @Test
242     public void testMakeTimerPool() {
243         // use a real provider so we can test the real makeTimer() method
244         DmaapSimProvider prov2 = new DmaapSimProvider(params);
245         prov2.start();
246         assertThatCode(() -> prov2.stop()).doesNotThrowAnyException();
247     }
248
249     @Test
250     public void testMakeTopicData() {
251         // use a real provider so we can test the real makeTopicData() method
252         DmaapSimProvider prov2 = new DmaapSimProvider(params);
253         assertThatCode(() -> prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0))
254                         .doesNotThrowAnyException();
255     }
256
257     @Test
258     public void testGetInstance_testSetInstance() {
259         DmaapSimProvider.setInstance(prov);
260         assertSame(prov, DmaapSimProvider.getInstance());
261
262         DmaapSimProvider.setInstance(null);
263         assertNull(DmaapSimProvider.getInstance());
264     }
265
266
267     public class MyProvider extends DmaapSimProvider {
268
269         public MyProvider(DmaapSimParameterGroup params) {
270             super(params);
271         }
272
273         @Override
274         protected ScheduledExecutorService makeTimerPool() {
275             return timer;
276         }
277
278         @Override
279         protected TopicData makeTopicData(String topicName) {
280             switch (topicName) {
281                 case TOPIC1:
282                     return data1;
283                 case TOPIC2:
284                     return data2;
285                 default:
286                     throw new IllegalArgumentException("unknown topic name: " + topicName);
287             }
288         }
289     }
290 }