2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertThrows;
30 import static org.junit.Assert.fail;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.times;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
37 import com.att.nsa.cambria.client.CambriaConsumer;
38 import java.io.IOException;
39 import java.nio.charset.StandardCharsets;
40 import java.util.Arrays;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.List;
45 import java.util.Properties;
46 import java.util.concurrent.CountDownLatch;
47 import org.apache.commons.collections4.IteratorUtils;
48 import org.apache.kafka.clients.consumer.ConsumerConfig;
49 import org.apache.kafka.clients.consumer.ConsumerRecord;
50 import org.apache.kafka.clients.consumer.ConsumerRecords;
51 import org.apache.kafka.clients.consumer.KafkaConsumer;
52 import org.apache.kafka.common.TopicPartition;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.mockito.Mock;
56 import org.mockito.MockitoAnnotations;
57 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
58 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
59 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
60 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
61 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
62 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
63 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
64 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
65 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
66 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
67 import org.springframework.test.util.ReflectionTestUtils;
69 public class BusConsumerTest extends TopicTestBase {
71 private static final int SHORT_TIMEOUT_MILLIS = 10;
72 private static final int LONG_TIMEOUT_MILLIS = 3000;
75 KafkaConsumer<String, String> mockedKafkaConsumer;
81 MockitoAnnotations.initMocks(this);
86 public void testFetchingBusConsumer() {
87 // should not be negative
88 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
89 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
92 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
93 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
95 // should not be too large
96 cons = new FetchingBusConsumerImpl(
97 makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
98 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
100 // should not be what was specified
101 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
102 assertThat(cons.getSleepTime()).isEqualTo(100);
106 public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
108 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
110 private CountDownLatch started = new CountDownLatch(1);
113 protected void sleepAfterFetchFailure() {
115 super.sleepAfterFetchFailure();
120 long tstart = System.currentTimeMillis();
121 cons.sleepAfterFetchFailure();
122 assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
124 // close while sleeping - sleep should halt prematurely
125 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
126 cons.started = new CountDownLatch(1);
127 Thread thread = new Thread(cons::sleepAfterFetchFailure);
128 tstart = System.currentTimeMillis();
130 cons.started.await();
133 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
135 // interrupt while sleeping - sleep should halt prematurely
136 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
137 cons.started = new CountDownLatch(1);
138 thread = new Thread(cons::sleepAfterFetchFailure);
139 tstart = System.currentTimeMillis();
141 cons.started.await();
144 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
148 public void testCambriaConsumerWrapper() {
149 // verify that different wrappers can be built
150 new CambriaConsumerWrapper(makeBuilder().build());
151 new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
152 new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
153 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
154 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
155 new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
156 new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
157 new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
158 new CambriaConsumerWrapper(makeBuilder().userName(null).build());
159 new CambriaConsumerWrapper(makeBuilder().password(null).build());
161 assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
162 .doesNotThrowAnyException();
166 public void testCambriaConsumerWrapperFetch() throws Exception {
167 CambriaConsumer inner = mock(CambriaConsumer.class);
168 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
169 when(inner.fetch()).thenReturn(lst);
171 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
172 ReflectionTestUtils.setField(cons, "consumer", inner);
174 assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
176 // arrange to throw exception next time fetch is called
177 IOException ex = new IOException(EXPECTED);
178 when(inner.fetch()).thenThrow(ex);
180 cons.fetchTimeout = 10;
184 fail("missing exception");
186 } catch (IOException e) {
192 public void testCambriaConsumerWrapperClose() {
193 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
194 assertThatCode(cons::close).doesNotThrowAnyException();
198 public void testCambriaConsumerWrapperToString() {
199 assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
203 public void testDmaapConsumerWrapper() {
204 // verify that different wrappers can be built
205 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
208 @Test(expected = IllegalArgumentException.class)
209 public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
210 new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
214 public void testDmaapConsumerWrapperFetch() throws Exception {
215 DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
216 MRConsumerImpl cons = mock(MRConsumerImpl.class);
218 dmaap.fetchTimeout = 5;
219 dmaap.consumer = cons;
222 when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
223 assertFalse(dmaap.fetch().iterator().hasNext());
225 // with messages, 200
226 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
227 MRConsumerResponse resp = new MRConsumerResponse();
228 resp.setResponseCode("200");
229 resp.setActualMessages(lst);
230 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
232 assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
235 resp.setActualMessages(null);
236 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
238 assertFalse(dmaap.fetch().iterator().hasNext());
240 // with messages, NOT 200
241 resp.setResponseCode("400");
242 resp.setActualMessages(lst);
243 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
245 assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
249 public void testDmaapConsumerWrapperClose() {
250 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
254 public void testDmaapConsumerWrapperToString() throws Exception {
255 assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
259 public void testDmaapAafConsumerWrapper() throws Exception {
260 // verify that different wrappers can be built
261 new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
262 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
263 .doesNotThrowAnyException();
266 @Test(expected = IllegalArgumentException.class)
267 public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
269 * Unfortunately, the MR code intercepts this and throws an exception before the
270 * wrapper gets a chance to check it, thus this test does not improve the coverage
271 * for the constructor.
273 new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
277 public void testDmaapAafConsumerWrapperToString() throws Exception {
278 assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
282 public void testDmaapDmeConsumerWrapper() throws Exception {
283 // verify that different wrappers can be built
284 new DmaapDmeConsumerWrapper(makeBuilder().build());
285 new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
286 new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
287 new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
289 addProps.put(ROUTE_PROP, MY_ROUTE);
290 new DmaapDmeConsumerWrapper(makeBuilder().build());
291 assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
292 .doesNotThrowAnyException();
295 @Test(expected = IllegalArgumentException.class)
296 public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
297 new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
300 @Test(expected = IllegalArgumentException.class)
301 public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
302 new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
305 @Test(expected = IllegalArgumentException.class)
306 public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
307 new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
310 @Test(expected = IllegalArgumentException.class)
311 public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
312 new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
315 @Test(expected = IllegalArgumentException.class)
316 public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
317 new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
321 public void testKafkaConsumerWrapper() {
322 // verify that different wrappers can be built
323 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
326 @Test(expected = IllegalArgumentException.class)
327 public void testKafkaConsumerWrapper_InvalidTopic() {
328 new KafkaConsumerWrapper(makeBuilder().topic(null).build());
332 public void testKafkaConsumerWrapperFetch() {
334 //Setup Properties for consumer
335 Properties kafkaProps = new Properties();
336 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
337 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
338 kafkaProps.setProperty("enable.auto.commit", "true");
339 kafkaProps.setProperty("auto.commit.interval.ms", "1000");
340 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
341 "org.apache.kafka.common.serialization.StringDeserializer");
342 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
343 "org.apache.kafka.common.serialization.StringDeserializer");
344 kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
345 kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
347 KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
348 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
349 kafka.consumer = consumer;
351 assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
356 public void testFetchNoMessages() throws IOException {
357 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
358 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
360 when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
362 Iterable<String> result = kafkaConsumerWrapper.fetch();
364 verify(mockedKafkaConsumer, times(1)).poll(any());
366 assertThat(result != null);
368 assertThat(!result.iterator().hasNext());
370 mockedKafkaConsumer.close();
374 public void testFetchWithMessages() {
376 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
377 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
379 ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
380 Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
381 recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
382 ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
384 when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
386 Iterable<String> result = kafkaConsumerWrapper.fetch();
388 verify(mockedKafkaConsumer, times(1)).poll(any());
390 verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
392 assertThat(result != null);
394 assertThat(result.iterator().hasNext());
396 assertThat(result.iterator().next().equals("value"));
398 mockedKafkaConsumer.close();
402 public void testFetchWithMessagesAndTraceparent() {
404 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
405 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
407 ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
408 record.headers().add(
410 "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8)
413 Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
414 recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
415 ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
417 when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
419 Iterable<String> result = kafkaConsumerWrapper.fetch();
421 verify(mockedKafkaConsumer, times(1)).poll(any());
423 verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
425 assertThat(result != null);
427 assertThat(result.iterator().hasNext());
429 assertThat(result.iterator().next().equals("value"));
431 mockedKafkaConsumer.close();
436 public void testKafkaConsumerWrapperClose() {
437 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
441 public void testKafkaConsumerWrapperToString() {
442 assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
445 private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
447 protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
448 super(busTopicParams);
452 public Iterable<String> fetch() {