Java 17 Upgrade
[policy/models.git] / models-sim / models-sim-dmaap / src / test / java / org / onap / policy / models / sim / dmaap / provider / DmaapSimProviderTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
4  * Modifications Copyright (C) 2023 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * ============LICENSE_END=========================================================
18  */
19
20 package org.onap.policy.models.sim.dmaap.provider;
21
22 import static org.assertj.core.api.Assertions.assertThatCode;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertNull;
26 import static org.junit.Assert.assertSame;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.anyInt;
29 import static org.mockito.ArgumentMatchers.anyLong;
30 import static org.mockito.ArgumentMatchers.eq;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.times;
33 import static org.mockito.Mockito.verify;
34 import static org.mockito.Mockito.when;
35
36 import jakarta.ws.rs.core.Response;
37 import jakarta.ws.rs.core.Response.Status;
38 import java.util.Arrays;
39 import java.util.Collections;
40 import java.util.List;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.ScheduledExecutorService;
44 import java.util.concurrent.TimeUnit;
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.Captor;
51 import org.mockito.Mock;
52 import org.mockito.junit.MockitoJUnitRunner;
53 import org.onap.policy.common.utils.coder.CoderException;
54 import org.onap.policy.common.utils.coder.StandardCoder;
55 import org.onap.policy.common.utils.coder.StandardCoderObject;
56 import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
57
58 @RunWith(MockitoJUnitRunner.class)
59 public class DmaapSimProviderTest {
60     private static final String EXPECTED_EXCEPTION = "expected exception";
61     private static final long SWEEP_SEC = 10L;
62     private static final String TOPIC1 = "topic-A";
63     private static final String TOPIC2 = "topic-B";
64     private static final String CONSUMER1 = "consumer-X";
65     private static final String CONSUMER_ID1 = "id1";
66
67     private MyProvider prov;
68
69     @Mock
70     private DmaapSimParameterGroup params;
71
72     @Mock
73     private ScheduledExecutorService timer;
74
75     @Mock
76     private TopicData data1;
77
78     @Mock
79     private TopicData data2;
80
81     @Captor
82     private ArgumentCaptor<List<Object>> listCaptor;
83
84     @Captor
85     private ArgumentCaptor<List<Object>> listCaptor2;
86
87     /**
88      * Sets up.
89      */
90     @Before
91     public void setUp() {
92         when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
93
94         prov = new MyProvider(params);
95     }
96
97     /**
98      * Shuts down the provider, if it's running.
99      */
100     @After
101     public void tearDown() {
102         if (prov.isAlive()) {
103             prov.shutdown();
104         }
105     }
106
107     /**
108      * Verifies that the constructor adds all the expected actions to the service
109      * manager container.
110      */
111     @Test
112     public void testDmaapSimProvider() {
113         prov.start();
114         verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
115
116         prov.stop();
117         verify(timer).shutdown();
118     }
119
120     @Test
121     public void testProcessDmaapMessagePut_List() throws CoderException {
122         prov = spy(new MyProvider(params));
123
124         when(data1.write(any())).thenReturn(2);
125
126         // force topics to exist
127         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
128         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
129
130         List<Object> lst = Arrays.asList("hello", "world");
131         Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
132         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
133         StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
134         assertEquals("2", sco.getString("count"));
135
136         List<Object> lst2 = Arrays.asList("helloB", "worldB");
137         prov.processDmaapMessagePut(TOPIC1, lst2);
138         prov.processDmaapMessagePut(TOPIC2, lst2);
139
140         // should only invoke this once for each topic
141         verify(prov).makeTopicData(TOPIC1);
142         verify(prov).makeTopicData(TOPIC2);
143
144         // should process all writes
145         verify(data1).write(lst);
146         verify(data1).write(lst2);
147
148         verify(data2).write(lst2);
149     }
150
151     @Test
152     public void testProcessDmaapMessagePut_Single() throws CoderException {
153         prov = spy(new MyProvider(params));
154
155         // force topics to exist
156         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
157         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
158
159         final String value1 = "abc";
160         Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
161         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
162
163         // ensure that the response can be decoded
164         new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
165
166         final String value2 = "def";
167         prov.processDmaapMessagePut(TOPIC1, value2);
168         prov.processDmaapMessagePut(TOPIC2, value2);
169
170         // should only invoke this once for each topic
171         verify(prov).makeTopicData(TOPIC1);
172         verify(prov).makeTopicData(TOPIC2);
173
174         // should process all writes as singleton lists
175         verify(data1, times(2)).write(listCaptor.capture());
176         assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
177         assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
178
179         verify(data2).write(listCaptor2.capture());
180         assertEquals(Collections.singletonList(value2), listCaptor2.getAllValues().get(0));
181     }
182
183     @Test
184     public void testProcessDmaapMessageGet() throws InterruptedException {
185         List<String> msgs = Arrays.asList("400", "500");
186         when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
187
188         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
189         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
190         assertEquals(msgs.toString(), resp.getEntity().toString());
191     }
192
193     @Test
194     public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
195         when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
196
197         Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
198         assertEquals(Status.OK.getStatusCode(), resp.getStatus());
199         assertEquals("[]", resp.getEntity().toString());
200     }
201
202     @Test
203     public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
204         BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
205
206         // put in a background thread, so it doesn't interrupt the tester thread
207         new Thread(() -> {
208             try {
209                 when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
210                 respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
211             } catch (InterruptedException e) {
212                 Thread.currentThread().interrupt();
213             }
214         }).start();
215
216         Response resp = respQueue.poll(3, TimeUnit.SECONDS);
217         assertNotNull(resp);
218
219         assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
220         assertEquals("[]", resp.getEntity().toString());
221     }
222
223     @Test
224     public void testSweepTopicTaskRun() {
225         prov.start();
226         prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
227         prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
228
229         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
230         verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
231
232         captor.getValue().run();
233         verify(data1).removeIdleConsumers();
234         verify(data2).removeIdleConsumers();
235
236         // run it again
237         captor.getValue().run();
238         verify(data1, times(2)).removeIdleConsumers();
239         verify(data2, times(2)).removeIdleConsumers();
240     }
241
242     @Test
243     public void testMakeTimerPool() {
244         // use a real provider, so we can test the real makeTimer() method
245         DmaapSimProvider prov2 = new DmaapSimProvider(params);
246         prov2.start();
247         assertThatCode(prov2::stop).doesNotThrowAnyException();
248     }
249
250     @Test
251     public void testMakeTopicData() {
252         // use a real provider, so we can test the real makeTopicData() method
253         DmaapSimProvider prov2 = new DmaapSimProvider(params);
254         assertThatCode(() -> prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0))
255                         .doesNotThrowAnyException();
256     }
257
258     @Test
259     public void testGetInstance_testSetInstance() {
260         DmaapSimProvider.setInstance(prov);
261         assertSame(prov, DmaapSimProvider.getInstance());
262
263         DmaapSimProvider.setInstance(null);
264         assertNull(DmaapSimProvider.getInstance());
265     }
266
267
268     public class MyProvider extends DmaapSimProvider {
269
270         public MyProvider(DmaapSimParameterGroup params) {
271             super(params);
272         }
273
274         @Override
275         protected ScheduledExecutorService makeTimerPool() {
276             return timer;
277         }
278
279         @Override
280         protected TopicData makeTopicData(String topicName) {
281             return switch (topicName) {
282                 case TOPIC1 -> data1;
283                 case TOPIC2 -> data2;
284                 default -> throw new IllegalArgumentException("unknown topic name: " + topicName);
285             };
286         }
287     }
288 }