86b32e69f43e357d3ef1d641b47befb26f5964a9
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertThrows;
30 import static org.junit.Assert.fail;
31 import static org.mockito.Mockito.mock;
32 import static org.mockito.Mockito.when;
33
34 import com.att.nsa.cambria.client.CambriaConsumer;
35 import java.io.IOException;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import java.util.List;
39 import java.util.Properties;
40 import java.util.concurrent.CountDownLatch;
41 import org.apache.commons.collections4.IteratorUtils;
42 import org.apache.kafka.clients.consumer.ConsumerConfig;
43 import org.apache.kafka.clients.consumer.KafkaConsumer;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
47 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
48 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
49 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
50 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
51 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
52 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
53 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
54 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
55 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
56 import org.springframework.test.util.ReflectionTestUtils;
57
58 public class BusConsumerTest extends TopicTestBase {
59
60     private static final int SHORT_TIMEOUT_MILLIS = 10;
61     private static final int LONG_TIMEOUT_MILLIS = 3000;
62
63     @Before
64     @Override
65     public void setUp() {
66         super.setUp();
67     }
68
69     @Test
70     public void testFetchingBusConsumer() {
71         // should not be negative
72         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
73         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
74
75         // should not be zero
76         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
77         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
78
79         // should not be too large
80         cons = new FetchingBusConsumerImpl(
81                         makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
82         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
83
84         // should not be what was specified
85         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
86         assertThat(cons.getSleepTime()).isEqualTo(100);
87     }
88
89     @Test
90     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
91
92         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
93
94             private CountDownLatch started = new CountDownLatch(1);
95
96             @Override
97             protected void sleepAfterFetchFailure() {
98                 started.countDown();
99                 super.sleepAfterFetchFailure();
100             }
101         };
102
103         // full sleep
104         long tstart = System.currentTimeMillis();
105         cons.sleepAfterFetchFailure();
106         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
107
108         // close while sleeping - sleep should halt prematurely
109         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
110         cons.started = new CountDownLatch(1);
111         Thread thread = new Thread(cons::sleepAfterFetchFailure);
112         tstart = System.currentTimeMillis();
113         thread.start();
114         cons.started.await();
115         cons.close();
116         thread.join();
117         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
118
119         // interrupt while sleeping - sleep should halt prematurely
120         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
121         cons.started = new CountDownLatch(1);
122         thread = new Thread(cons::sleepAfterFetchFailure);
123         tstart = System.currentTimeMillis();
124         thread.start();
125         cons.started.await();
126         thread.interrupt();
127         thread.join();
128         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
129     }
130
131     @Test
132     public void testCambriaConsumerWrapper() {
133         // verify that different wrappers can be built
134         new CambriaConsumerWrapper(makeBuilder().build());
135         new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
136         new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
137         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
138         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
139         new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
140         new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
141         new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
142         new CambriaConsumerWrapper(makeBuilder().userName(null).build());
143         new CambriaConsumerWrapper(makeBuilder().password(null).build());
144
145         assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
146                         .doesNotThrowAnyException();
147     }
148
149     @Test
150     public void testCambriaConsumerWrapperFetch() throws Exception {
151         CambriaConsumer inner = mock(CambriaConsumer.class);
152         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
153         when(inner.fetch()).thenReturn(lst);
154
155         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
156         ReflectionTestUtils.setField(cons, "consumer", inner);
157
158         assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
159
160         // arrange to throw exception next time fetch is called
161         IOException ex = new IOException(EXPECTED);
162         when(inner.fetch()).thenThrow(ex);
163
164         cons.fetchTimeout = 10;
165
166         try {
167             cons.fetch();
168             fail("missing exception");
169
170         } catch (IOException e) {
171             assertEquals(ex, e);
172         }
173     }
174
175     @Test
176     public void testCambriaConsumerWrapperClose() {
177         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
178         assertThatCode(cons::close).doesNotThrowAnyException();
179     }
180
181     @Test
182     public void testCambriaConsumerWrapperToString() {
183         assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
184     }
185
186     @Test
187     public void testDmaapConsumerWrapper() {
188         // verify that different wrappers can be built
189         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
190     }
191
192     @Test(expected = IllegalArgumentException.class)
193     public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
194         new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
195     }
196
197     @Test
198     public void testDmaapConsumerWrapperFetch() throws Exception {
199         DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
200         MRConsumerImpl cons = mock(MRConsumerImpl.class);
201
202         dmaap.fetchTimeout = 5;
203         dmaap.consumer = cons;
204
205         // null return
206         when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
207         assertFalse(dmaap.fetch().iterator().hasNext());
208
209         // with messages, 200
210         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
211         MRConsumerResponse resp = new MRConsumerResponse();
212         resp.setResponseCode("200");
213         resp.setActualMessages(lst);
214         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
215
216         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
217
218         // null messages
219         resp.setActualMessages(null);
220         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
221
222         assertFalse(dmaap.fetch().iterator().hasNext());
223
224         // with messages, NOT 200
225         resp.setResponseCode("400");
226         resp.setActualMessages(lst);
227         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
228
229         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
230     }
231
232     @Test
233     public void testDmaapConsumerWrapperClose() {
234         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
235     }
236
237     @Test
238     public void testDmaapConsumerWrapperToString() throws Exception {
239         assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
240     }
241
242     @Test
243     public void testDmaapAafConsumerWrapper() throws Exception {
244         // verify that different wrappers can be built
245         new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
246         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
247                         .doesNotThrowAnyException();
248     }
249
250     @Test(expected = IllegalArgumentException.class)
251     public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
252         /*
253          * Unfortunately, the MR code intercepts this and throws an exception before the
254          * wrapper gets a chance to check it, thus this test does not improve the coverage
255          * for the constructor.
256          */
257         new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
258     }
259
260     @Test
261     public void testDmaapAafConsumerWrapperToString() throws Exception {
262         assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
263     }
264
265     @Test
266     public void testDmaapDmeConsumerWrapper() throws Exception {
267         // verify that different wrappers can be built
268         new DmaapDmeConsumerWrapper(makeBuilder().build());
269         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
270         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
271         new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
272
273         addProps.put(ROUTE_PROP, MY_ROUTE);
274         new DmaapDmeConsumerWrapper(makeBuilder().build());
275         assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
276                         .doesNotThrowAnyException();
277     }
278
279     @Test(expected = IllegalArgumentException.class)
280     public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
281         new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
282     }
283
284     @Test(expected = IllegalArgumentException.class)
285     public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
286         new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
287     }
288
289     @Test(expected = IllegalArgumentException.class)
290     public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
291         new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
292     }
293
294     @Test(expected = IllegalArgumentException.class)
295     public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
296         new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
297     }
298
299     @Test(expected = IllegalArgumentException.class)
300     public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
301         new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
302     }
303
304     @Test
305     public void testKafkaConsumerWrapper() {
306         // verify that different wrappers can be built
307         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
308     }
309
310     @Test(expected = IllegalArgumentException.class)
311     public void testKafkaConsumerWrapper_InvalidTopic() {
312         new KafkaConsumerWrapper(makeBuilder().topic(null).build());
313     }
314
315     @Test
316     public void testKafkaConsumerWrapperFetch() {
317
318         //Setup Properties for consumer
319         Properties kafkaProps = new Properties();
320         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
321         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
322         kafkaProps.setProperty("enable.auto.commit", "true");
323         kafkaProps.setProperty("auto.commit.interval.ms", "1000");
324         kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
325             "org.apache.kafka.common.serialization.StringDeserializer");
326         kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
327             "org.apache.kafka.common.serialization.StringDeserializer");
328         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
329         kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
330
331         KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
332         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
333         kafka.consumer = consumer;
334
335         assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
336         consumer.close();
337     }
338
339     @Test
340     public void testKafkaConsumerWrapperClose() {
341         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
342     }
343
344     @Test
345     public void testKafkaConsumerWrapperToString() {
346         assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
347     }
348
349     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
350
351         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
352             super(busTopicParams);
353         }
354
355         @Override
356         public Iterable<String> fetch() {
357             return null;
358         }
359     }
360 }