2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertThrows;
29 import static org.junit.Assert.fail;
30 import static org.mockito.ArgumentMatchers.any;
31 import static org.mockito.Mockito.mock;
32 import static org.mockito.Mockito.times;
33 import static org.mockito.Mockito.verify;
34 import static org.mockito.Mockito.when;
36 import com.att.nsa.cambria.client.CambriaConsumer;
37 import java.io.IOException;
38 import java.nio.charset.StandardCharsets;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.HashMap;
42 import java.util.List;
44 import java.util.Properties;
45 import java.util.concurrent.CountDownLatch;
46 import org.apache.commons.collections4.IteratorUtils;
47 import org.apache.kafka.clients.consumer.ConsumerConfig;
48 import org.apache.kafka.clients.consumer.ConsumerRecord;
49 import org.apache.kafka.clients.consumer.ConsumerRecords;
50 import org.apache.kafka.clients.consumer.KafkaConsumer;
51 import org.apache.kafka.common.TopicPartition;
52 import org.junit.Before;
53 import org.junit.Test;
54 import org.mockito.Mock;
55 import org.mockito.MockitoAnnotations;
56 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
57 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
58 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
59 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
60 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
61 import org.springframework.test.util.ReflectionTestUtils;
63 public class BusConsumerTest extends TopicTestBase {
65 private static final int SHORT_TIMEOUT_MILLIS = 10;
66 private static final int LONG_TIMEOUT_MILLIS = 3000;
69 KafkaConsumer<String, String> mockedKafkaConsumer;
75 MockitoAnnotations.initMocks(this);
80 public void testFetchingBusConsumer() {
81 // should not be negative
82 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
83 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
86 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
87 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
89 // should not be too large
90 cons = new FetchingBusConsumerImpl(
91 makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
92 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
94 // should not be what was specified
95 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
96 assertThat(cons.getSleepTime()).isEqualTo(100);
100 public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
102 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
104 private CountDownLatch started = new CountDownLatch(1);
107 protected void sleepAfterFetchFailure() {
109 super.sleepAfterFetchFailure();
114 long tstart = System.currentTimeMillis();
115 cons.sleepAfterFetchFailure();
116 assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
118 // close while sleeping - sleep should halt prematurely
119 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
120 cons.started = new CountDownLatch(1);
121 Thread thread = new Thread(cons::sleepAfterFetchFailure);
122 tstart = System.currentTimeMillis();
124 cons.started.await();
127 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
129 // interrupt while sleeping - sleep should halt prematurely
130 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
131 cons.started = new CountDownLatch(1);
132 thread = new Thread(cons::sleepAfterFetchFailure);
133 tstart = System.currentTimeMillis();
135 cons.started.await();
138 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
142 public void testCambriaConsumerWrapper() {
143 // verify that different wrappers can be built
144 new CambriaConsumerWrapper(makeBuilder().build());
145 new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
146 new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
147 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
148 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
149 new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
150 new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
151 new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
152 new CambriaConsumerWrapper(makeBuilder().userName(null).build());
153 new CambriaConsumerWrapper(makeBuilder().password(null).build());
155 assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
156 .doesNotThrowAnyException();
160 public void testCambriaConsumerWrapperFetch() throws Exception {
161 CambriaConsumer inner = mock(CambriaConsumer.class);
162 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
163 when(inner.fetch()).thenReturn(lst);
165 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
166 ReflectionTestUtils.setField(cons, "consumer", inner);
168 assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
170 // arrange to throw exception next time fetch is called
171 IOException ex = new IOException(EXPECTED);
172 when(inner.fetch()).thenThrow(ex);
174 cons.fetchTimeout = 10;
178 fail("missing exception");
180 } catch (IOException e) {
186 public void testCambriaConsumerWrapperClose() {
187 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
188 assertThatCode(cons::close).doesNotThrowAnyException();
192 public void testCambriaConsumerWrapperToString() {
193 assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
197 public void testKafkaConsumerWrapper() {
198 // verify that different wrappers can be built
199 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
202 @Test(expected = IllegalArgumentException.class)
203 public void testKafkaConsumerWrapper_InvalidTopic() {
204 new KafkaConsumerWrapper(makeBuilder().topic(null).build());
208 public void testKafkaConsumerWrapperFetch() {
210 //Setup Properties for consumer
211 Properties kafkaProps = new Properties();
212 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
213 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
214 kafkaProps.setProperty("enable.auto.commit", "true");
215 kafkaProps.setProperty("auto.commit.interval.ms", "1000");
216 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
217 "org.apache.kafka.common.serialization.StringDeserializer");
218 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
219 "org.apache.kafka.common.serialization.StringDeserializer");
220 kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
221 kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
223 KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
224 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
225 kafka.consumer = consumer;
227 assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
232 public void testFetchNoMessages() throws IOException {
233 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
234 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
236 when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
238 Iterable<String> result = kafkaConsumerWrapper.fetch();
240 verify(mockedKafkaConsumer, times(1)).poll(any());
242 assertThat(result != null);
244 assertThat(!result.iterator().hasNext());
246 mockedKafkaConsumer.close();
250 public void testFetchWithMessages() {
252 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
253 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
255 ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
256 Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
257 recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
258 ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
260 when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
262 Iterable<String> result = kafkaConsumerWrapper.fetch();
264 verify(mockedKafkaConsumer, times(1)).poll(any());
266 verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
268 assertThat(result != null);
270 assertThat(result.iterator().hasNext());
272 assertThat(result.iterator().next().equals("value"));
274 mockedKafkaConsumer.close();
278 public void testFetchWithMessagesAndTraceparent() {
280 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
281 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
283 ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
284 record.headers().add(
286 "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8)
289 Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
290 recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
291 ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
293 when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
295 Iterable<String> result = kafkaConsumerWrapper.fetch();
297 verify(mockedKafkaConsumer, times(1)).poll(any());
299 verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
301 assertThat(result != null);
303 assertThat(result.iterator().hasNext());
305 assertThat(result.iterator().next().equals("value"));
307 mockedKafkaConsumer.close();
312 public void testKafkaConsumerWrapperClose() {
313 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
317 public void testKafkaConsumerWrapperToString() {
318 assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
321 private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
323 protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
324 super(busTopicParams);
328 public Iterable<String> fetch() {