2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
27 import static org.junit.jupiter.api.Assertions.assertEquals;
28 import static org.junit.jupiter.api.Assertions.assertFalse;
29 import static org.junit.jupiter.api.Assertions.assertNotNull;
30 import static org.junit.jupiter.api.Assertions.assertThrows;
31 import static org.junit.jupiter.api.Assertions.assertTrue;
32 import static org.mockito.ArgumentMatchers.any;
33 import static org.mockito.Mockito.times;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
37 import java.nio.charset.StandardCharsets;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.List;
42 import java.util.Properties;
43 import java.util.concurrent.CountDownLatch;
44 import org.apache.kafka.clients.consumer.ConsumerConfig;
45 import org.apache.kafka.clients.consumer.ConsumerRecord;
46 import org.apache.kafka.clients.consumer.ConsumerRecords;
47 import org.apache.kafka.clients.consumer.KafkaConsumer;
48 import org.apache.kafka.common.TopicPartition;
49 import org.junit.jupiter.api.AfterEach;
50 import org.junit.jupiter.api.BeforeEach;
51 import org.junit.jupiter.api.Test;
52 import org.mockito.Mock;
53 import org.mockito.MockitoAnnotations;
54 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
55 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
56 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
57 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
59 class BusConsumerTest extends TopicTestBase {
61 private static final int SHORT_TIMEOUT_MILLIS = 10;
62 private static final int LONG_TIMEOUT_MILLIS = 3000;
65 KafkaConsumer<String, String> mockedKafkaConsumer;
67 AutoCloseable closeable;
73 closeable = MockitoAnnotations.openMocks(this);
77 public void tearDown() throws Exception {
83 void testFetchingBusConsumer() {
84 // should not be negative
85 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
86 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
89 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
90 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
92 // should not be too large
93 cons = new FetchingBusConsumerImpl(
94 makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
95 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
97 // should not be what was specified
98 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
99 assertThat(cons.getSleepTime()).isEqualTo(100);
103 void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
105 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
107 private CountDownLatch started = new CountDownLatch(1);
110 protected void sleepAfterFetchFailure() {
112 super.sleepAfterFetchFailure();
117 long tstart = System.currentTimeMillis();
118 cons.sleepAfterFetchFailure();
119 assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
121 // close while sleeping - sleep should halt prematurely
122 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
123 cons.started = new CountDownLatch(1);
124 Thread thread = new Thread(cons::sleepAfterFetchFailure);
125 tstart = System.currentTimeMillis();
127 cons.started.await();
130 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
132 // interrupt while sleeping - sleep should halt prematurely
133 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
134 cons.started = new CountDownLatch(1);
135 thread = new Thread(cons::sleepAfterFetchFailure);
136 tstart = System.currentTimeMillis();
138 cons.started.await();
141 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
145 void testKafkaConsumerWrapper() {
146 // verify that different wrappers can be built
147 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
151 void testKafkaConsumerWrapper_InvalidTopic() {
152 BusTopicParams params = makeBuilder().topic(null).build();
153 assertThatThrownBy(() -> new KafkaConsumerWrapper(params))
154 .isInstanceOf(IllegalArgumentException.class);
158 void testKafkaConsumerWrapperFetch() {
160 //Setup Properties for consumer
161 Properties kafkaProps = new Properties();
162 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
163 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
164 kafkaProps.setProperty("enable.auto.commit", "true");
165 kafkaProps.setProperty("auto.commit.interval.ms", "1000");
166 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
167 "org.apache.kafka.common.serialization.StringDeserializer");
168 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
169 "org.apache.kafka.common.serialization.StringDeserializer");
170 kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
171 kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
173 KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
174 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
175 kafka.consumer = consumer;
177 assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
182 void testFetchNoMessages() {
183 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
184 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
186 when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
188 Iterable<String> result = kafkaConsumerWrapper.fetch();
190 verify(mockedKafkaConsumer).poll(any());
192 assertNotNull(result);
194 assertFalse(result.iterator().hasNext());
196 mockedKafkaConsumer.close();
200 void testFetchWithMessages() {
202 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
203 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
205 ConsumerRecord<String, String> customerRecord =
206 new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
207 Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
208 recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(customerRecord));
209 ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
211 when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
213 Iterable<String> result = kafkaConsumerWrapper.fetch();
215 verify(mockedKafkaConsumer, times(1)).poll(any());
217 verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
219 assertNotNull(result);
221 assertTrue(result.iterator().hasNext());
223 assertEquals("value", result.iterator().next());
225 mockedKafkaConsumer.close();
229 void testFetchWithMessagesAndTraceparent() {
231 KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
232 kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
234 ConsumerRecord<String, String> customerRecord =
235 new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
236 customerRecord.headers().add(
238 "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8)
241 Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
242 recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(customerRecord));
243 ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
245 when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
247 Iterable<String> result = kafkaConsumerWrapper.fetch();
249 verify(mockedKafkaConsumer, times(1)).poll(any());
251 verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
253 assertNotNull(result);
255 assertTrue(result.iterator().hasNext());
257 assertEquals("value", result.iterator().next());
259 mockedKafkaConsumer.close();
264 void testKafkaConsumerWrapperClose() {
265 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
269 void testKafkaConsumerWrapperToString() {
270 assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
273 private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
275 protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
276 super(busTopicParams);
280 public Iterable<String> fetch() {