2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertThrows;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
33 import java.io.IOException;
34 import java.nio.charset.StandardCharsets;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.Properties;
40 import java.util.concurrent.CountDownLatch;
41 import org.apache.kafka.clients.consumer.ConsumerConfig;
42 import org.apache.kafka.clients.consumer.ConsumerRecord;
43 import org.apache.kafka.clients.consumer.ConsumerRecords;
44 import org.apache.kafka.clients.consumer.KafkaConsumer;
45 import org.apache.kafka.common.TopicPartition;
46 import org.junit.After;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.mockito.Mock;
50 import org.mockito.MockitoAnnotations;
51 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
52 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
53 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
54 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
56 public class BusConsumerTest extends TopicTestBase {
58 private static final int SHORT_TIMEOUT_MILLIS = 10;
59 private static final int LONG_TIMEOUT_MILLIS = 3000;
62 KafkaConsumer<String, String> mockedKafkaConsumer;
64 AutoCloseable closeable;
70 closeable = MockitoAnnotations.openMocks(this);
74 public void tearDown() throws Exception {
80 public void testFetchingBusConsumer() {
81 // should not be negative
82 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
83 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
86 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
87 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
89 // should not be too large
90 cons = new FetchingBusConsumerImpl(
91 makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
92 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
94 // should not be what was specified
95 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
96 assertThat(cons.getSleepTime()).isEqualTo(100);
100 public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
102 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
104 private CountDownLatch started = new CountDownLatch(1);
107 protected void sleepAfterFetchFailure() {
109 super.sleepAfterFetchFailure();
114 long tstart = System.currentTimeMillis();
115 cons.sleepAfterFetchFailure();
116 assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
118 // close while sleeping - sleep should halt prematurely
119 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
120 cons.started = new CountDownLatch(1);
121 Thread thread = new Thread(cons::sleepAfterFetchFailure);
122 tstart = System.currentTimeMillis();
124 cons.started.await();
127 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
129 // interrupt while sleeping - sleep should halt prematurely
130 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
131 cons.started = new CountDownLatch(1);
132 thread = new Thread(cons::sleepAfterFetchFailure);
133 tstart = System.currentTimeMillis();
135 cons.started.await();
138 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
142 public void testKafkaConsumerWrapper() {
143 // verify that different wrappers can be built
144 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
147 @Test(expected = IllegalArgumentException.class)
148 public void testKafkaConsumerWrapper_InvalidTopic() {
149 new KafkaConsumerWrapper(makeBuilder().topic(null).build());
153 public void testKafkaConsumerWrapperFetch() {
155 //Setup Properties for consumer
156 Properties kafkaProps = new Properties();
157 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
158 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
159 kafkaProps.setProperty("enable.auto.commit", "true");
160 kafkaProps.setProperty("auto.commit.interval.ms", "1000");
161 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
162 "org.apache.kafka.common.serialization.StringDeserializer");
163 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
164 "org.apache.kafka.common.serialization.StringDeserializer");
165 kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
166 kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
168 KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
169 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
170 kafka.consumer = consumer;
172 assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
177 public void testFetchNoMessages() throws IOException {
178 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
179 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
181 when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
183 Iterable<String> result = kafkaConsumerWrapper.fetch();
185 verify(mockedKafkaConsumer, times(1)).poll(any());
187 assertThat(result != null);
189 assertThat(!result.iterator().hasNext());
191 mockedKafkaConsumer.close();
195 public void testFetchWithMessages() {
197 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
198 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
200 ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
201 Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
202 recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
203 ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
205 when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
207 Iterable<String> result = kafkaConsumerWrapper.fetch();
209 verify(mockedKafkaConsumer, times(1)).poll(any());
211 verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
213 assertThat(result != null);
215 assertThat(result.iterator().hasNext());
217 assertThat(result.iterator().next().equals("value"));
219 mockedKafkaConsumer.close();
223 public void testFetchWithMessagesAndTraceparent() {
225 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
226 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
228 ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
229 record.headers().add(
231 "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8)
234 Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
235 recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
236 ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
238 when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
240 Iterable<String> result = kafkaConsumerWrapper.fetch();
242 verify(mockedKafkaConsumer, times(1)).poll(any());
244 verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
246 assertThat(result != null);
248 assertThat(result.iterator().hasNext());
250 assertThat(result.iterator().next().equals("value"));
252 mockedKafkaConsumer.close();
257 public void testKafkaConsumerWrapperClose() {
258 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
262 public void testKafkaConsumerWrapperToString() {
263 assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
266 private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
268 protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
269 super(busTopicParams);
273 public Iterable<String> fetch() {