a95e773d7dd20dfc4f286a695aea82d3213af737
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertThrows;
30 import static org.junit.Assert.fail;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.times;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
36
37 import com.att.nsa.cambria.client.CambriaConsumer;
38 import java.io.IOException;
39 import java.nio.charset.StandardCharsets;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Properties;
46 import java.util.concurrent.CountDownLatch;
47 import org.apache.commons.collections4.IteratorUtils;
48 import org.apache.kafka.clients.consumer.ConsumerConfig;
49 import org.apache.kafka.clients.consumer.ConsumerRecord;
50 import org.apache.kafka.clients.consumer.ConsumerRecords;
51 import org.apache.kafka.clients.consumer.KafkaConsumer;
52 import org.apache.kafka.common.TopicPartition;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.mockito.Mock;
56 import org.mockito.MockitoAnnotations;
57 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
58 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
59 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
60 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
61 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
62 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
63 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
64 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
65 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
66 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
67 import org.springframework.test.util.ReflectionTestUtils;
68
69 public class BusConsumerTest extends TopicTestBase {
70
71     private static final int SHORT_TIMEOUT_MILLIS = 10;
72     private static final int LONG_TIMEOUT_MILLIS = 3000;
73
74     @Mock
75     KafkaConsumer<String, String> mockedKafkaConsumer;
76
77     @Before
78     @Override
79     public void setUp() {
80         super.setUp();
81         MockitoAnnotations.initMocks(this);
82     }
83
84
85     @Test
86     public void testFetchingBusConsumer() {
87         // should not be negative
88         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
89         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
90
91         // should not be zero
92         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
93         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
94
95         // should not be too large
96         cons = new FetchingBusConsumerImpl(
97                         makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
98         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
99
100         // should not be what was specified
101         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
102         assertThat(cons.getSleepTime()).isEqualTo(100);
103     }
104
105     @Test
106     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
107
108         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
109
110             private CountDownLatch started = new CountDownLatch(1);
111
112             @Override
113             protected void sleepAfterFetchFailure() {
114                 started.countDown();
115                 super.sleepAfterFetchFailure();
116             }
117         };
118
119         // full sleep
120         long tstart = System.currentTimeMillis();
121         cons.sleepAfterFetchFailure();
122         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
123
124         // close while sleeping - sleep should halt prematurely
125         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
126         cons.started = new CountDownLatch(1);
127         Thread thread = new Thread(cons::sleepAfterFetchFailure);
128         tstart = System.currentTimeMillis();
129         thread.start();
130         cons.started.await();
131         cons.close();
132         thread.join();
133         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
134
135         // interrupt while sleeping - sleep should halt prematurely
136         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
137         cons.started = new CountDownLatch(1);
138         thread = new Thread(cons::sleepAfterFetchFailure);
139         tstart = System.currentTimeMillis();
140         thread.start();
141         cons.started.await();
142         thread.interrupt();
143         thread.join();
144         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
145     }
146
147     @Test
148     public void testCambriaConsumerWrapper() {
149         // verify that different wrappers can be built
150         new CambriaConsumerWrapper(makeBuilder().build());
151         new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
152         new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
153         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
154         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
155         new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
156         new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
157         new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
158         new CambriaConsumerWrapper(makeBuilder().userName(null).build());
159         new CambriaConsumerWrapper(makeBuilder().password(null).build());
160
161         assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
162                         .doesNotThrowAnyException();
163     }
164
165     @Test
166     public void testCambriaConsumerWrapperFetch() throws Exception {
167         CambriaConsumer inner = mock(CambriaConsumer.class);
168         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
169         when(inner.fetch()).thenReturn(lst);
170
171         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
172         ReflectionTestUtils.setField(cons, "consumer", inner);
173
174         assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
175
176         // arrange to throw exception next time fetch is called
177         IOException ex = new IOException(EXPECTED);
178         when(inner.fetch()).thenThrow(ex);
179
180         cons.fetchTimeout = 10;
181
182         try {
183             cons.fetch();
184             fail("missing exception");
185
186         } catch (IOException e) {
187             assertEquals(ex, e);
188         }
189     }
190
191     @Test
192     public void testCambriaConsumerWrapperClose() {
193         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
194         assertThatCode(cons::close).doesNotThrowAnyException();
195     }
196
197     @Test
198     public void testCambriaConsumerWrapperToString() {
199         assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
200     }
201
202     @Test
203     public void testDmaapConsumerWrapper() {
204         // verify that different wrappers can be built
205         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
206     }
207
208     @Test(expected = IllegalArgumentException.class)
209     public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
210         new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
211     }
212
213     @Test
214     public void testDmaapConsumerWrapperFetch() throws Exception {
215         DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
216         MRConsumerImpl cons = mock(MRConsumerImpl.class);
217
218         dmaap.fetchTimeout = 5;
219         dmaap.consumer = cons;
220
221         // null return
222         when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
223         assertFalse(dmaap.fetch().iterator().hasNext());
224
225         // with messages, 200
226         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
227         MRConsumerResponse resp = new MRConsumerResponse();
228         resp.setResponseCode("200");
229         resp.setActualMessages(lst);
230         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
231
232         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
233
234         // null messages
235         resp.setActualMessages(null);
236         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
237
238         assertFalse(dmaap.fetch().iterator().hasNext());
239
240         // with messages, NOT 200
241         resp.setResponseCode("400");
242         resp.setActualMessages(lst);
243         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
244
245         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
246     }
247
248     @Test
249     public void testDmaapConsumerWrapperClose() {
250         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
251     }
252
253     @Test
254     public void testDmaapConsumerWrapperToString() throws Exception {
255         assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
256     }
257
258     @Test
259     public void testDmaapAafConsumerWrapper() throws Exception {
260         // verify that different wrappers can be built
261         new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
262         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
263                         .doesNotThrowAnyException();
264     }
265
266     @Test(expected = IllegalArgumentException.class)
267     public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
268         /*
269          * Unfortunately, the MR code intercepts this and throws an exception before the
270          * wrapper gets a chance to check it, thus this test does not improve the coverage
271          * for the constructor.
272          */
273         new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
274     }
275
276     @Test
277     public void testDmaapAafConsumerWrapperToString() throws Exception {
278         assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
279     }
280
281     @Test
282     public void testDmaapDmeConsumerWrapper() throws Exception {
283         // verify that different wrappers can be built
284         new DmaapDmeConsumerWrapper(makeBuilder().build());
285         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
286         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
287         new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
288
289         addProps.put(ROUTE_PROP, MY_ROUTE);
290         new DmaapDmeConsumerWrapper(makeBuilder().build());
291         assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
292                         .doesNotThrowAnyException();
293     }
294
295     @Test(expected = IllegalArgumentException.class)
296     public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
297         new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
298     }
299
300     @Test(expected = IllegalArgumentException.class)
301     public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
302         new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
303     }
304
305     @Test(expected = IllegalArgumentException.class)
306     public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
307         new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
308     }
309
310     @Test(expected = IllegalArgumentException.class)
311     public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
312         new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
313     }
314
315     @Test(expected = IllegalArgumentException.class)
316     public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
317         new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
318     }
319
320     @Test
321     public void testKafkaConsumerWrapper() {
322         // verify that different wrappers can be built
323         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
324     }
325
326     @Test(expected = IllegalArgumentException.class)
327     public void testKafkaConsumerWrapper_InvalidTopic() {
328         new KafkaConsumerWrapper(makeBuilder().topic(null).build());
329     }
330
331     @Test
332     public void testKafkaConsumerWrapperFetch() {
333
334         //Setup Properties for consumer
335         Properties kafkaProps = new Properties();
336         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
337         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
338         kafkaProps.setProperty("enable.auto.commit", "true");
339         kafkaProps.setProperty("auto.commit.interval.ms", "1000");
340         kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
341             "org.apache.kafka.common.serialization.StringDeserializer");
342         kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
343             "org.apache.kafka.common.serialization.StringDeserializer");
344         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
345         kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
346
347         KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
348         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
349         kafka.consumer = consumer;
350
351         assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
352         consumer.close();
353     }
354
355     @Test
356     public void testFetchNoMessages() throws IOException {
357         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
358         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
359
360         when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
361
362         Iterable<String> result = kafkaConsumerWrapper.fetch();
363
364         verify(mockedKafkaConsumer, times(1)).poll(any());
365
366         assertThat(result != null);
367
368         assertThat(!result.iterator().hasNext());
369
370         mockedKafkaConsumer.close();
371     }
372
373     @Test
374     public void testFetchWithMessages() {
375         // Setup
376         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
377         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
378
379         ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
380         Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
381         recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
382         ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
383
384         when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
385
386         Iterable<String> result = kafkaConsumerWrapper.fetch();
387
388         verify(mockedKafkaConsumer, times(1)).poll(any());
389
390         verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
391
392         assertThat(result != null);
393
394         assertThat(result.iterator().hasNext());
395
396         assertThat(result.iterator().next().equals("value"));
397
398         mockedKafkaConsumer.close();
399     }
400
401     @Test
402     public void testFetchWithMessagesAndTraceparent() {
403         // Setup
404         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
405         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
406
407         ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
408         record.headers().add(
409                 "traceparent",
410                 "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8)
411         );
412
413         Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
414         recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
415         ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
416
417         when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
418
419         Iterable<String> result = kafkaConsumerWrapper.fetch();
420
421         verify(mockedKafkaConsumer, times(1)).poll(any());
422
423         verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
424
425         assertThat(result != null);
426
427         assertThat(result.iterator().hasNext());
428
429         assertThat(result.iterator().next().equals("value"));
430
431         mockedKafkaConsumer.close();
432     }
433
434
435     @Test
436     public void testKafkaConsumerWrapperClose() {
437         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
438     }
439
440     @Test
441     public void testKafkaConsumerWrapperToString() {
442         assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
443     }
444
445     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
446
447         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
448             super(busTopicParams);
449         }
450
451         @Override
452         public Iterable<String> fetch() {
453             return null;
454         }
455     }
456 }