2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8 * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
26 import io.opentelemetry.api.trace.Span;
27 import io.opentelemetry.api.trace.SpanContext;
28 import io.opentelemetry.api.trace.TraceFlags;
29 import io.opentelemetry.api.trace.TraceState;
30 import io.opentelemetry.context.Context;
31 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
32 import java.io.IOException;
33 import java.nio.charset.StandardCharsets;
34 import java.time.Duration;
35 import java.util.ArrayList;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
43 import lombok.NoArgsConstructor;
44 import org.apache.kafka.clients.consumer.ConsumerConfig;
45 import org.apache.kafka.clients.consumer.ConsumerRecord;
46 import org.apache.kafka.clients.consumer.ConsumerRecords;
47 import org.apache.kafka.clients.consumer.KafkaConsumer;
48 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
49 import org.apache.kafka.common.TopicPartition;
50 import org.apache.kafka.common.header.Headers;
51 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * Wrapper around libraries to consume from message bus.
58 public interface BusConsumer {
63 * @return list of messages
64 * @throws IOException when error encountered by underlying libraries
66 public Iterable<String> fetch() throws IOException;
69 * close underlying library consumer.
74 * Consumer that handles fetch() failures by sleeping.
76 abstract class FetchingBusConsumer implements BusConsumer {
77 private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
82 protected int fetchTimeout;
85 * Time to sleep on a fetch failure.
88 private final int sleepTime;
91 * Counted down when {@link #close()} is invoked.
93 private final CountDownLatch closeCondition = new CountDownLatch(1);
97 * Constructs the object.
99 * @param busTopicParams parameters for the bus topic
101 protected FetchingBusConsumer(BusTopicParams busTopicParams) {
102 this.fetchTimeout = busTopicParams.getFetchTimeout();
104 if (this.fetchTimeout <= 0) {
105 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
107 // don't sleep too long, even if fetch timeout is large
108 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
113 * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
114 * or the thread is interrupted, then this will return immediately.
116 protected void sleepAfterFetchFailure() {
118 logger.info("{}: backoff for {}ms", this, sleepTime);
119 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
120 logger.info("{}: closed while handling fetch error", this);
123 } catch (InterruptedException e) {
124 logger.warn("{}: interrupted while handling fetch error", this, e);
125 Thread.currentThread().interrupt();
130 public void close() {
131 this.closeCondition.countDown();
136 * Kafka based consumer.
138 class KafkaConsumerWrapper extends FetchingBusConsumer {
143 private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
145 private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
150 protected KafkaConsumer<String, String> consumer;
151 protected Properties kafkaProps;
153 protected boolean allowTracing;
156 * Kafka Consumer Wrapper.
157 * BusTopicParam - object contains the following parameters
158 * servers - messaging bus hosts.
161 * @param busTopicParams - The parameters for the bus topic
163 public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
164 super(busTopicParams);
166 if (busTopicParams.isTopicInvalid()) {
167 throw new IllegalArgumentException("No topic for Kafka");
170 //Setup Properties for consumer
171 kafkaProps = new Properties();
172 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
173 busTopicParams.getServers().get(0));
175 if (busTopicParams.isAdditionalPropsValid()) {
176 kafkaProps.putAll(busTopicParams.getAdditionalProps());
179 if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
180 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
182 if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
183 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
185 if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
186 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
188 if (busTopicParams.isAllowTracing()) {
189 this.allowTracing = true;
190 kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
191 TracingConsumerInterceptor.class.getName());
194 consumer = new KafkaConsumer<>(kafkaProps);
195 //Subscribe to the topic
196 consumer.subscribe(List.of(busTopicParams.getTopic()));
200 public Iterable<String> fetch() {
201 ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
202 if (records == null || records.count() <= 0) {
203 return Collections.emptyList();
205 List<String> messages = new ArrayList<>(records.count());
208 createParentTraceContext(records);
211 for (TopicPartition partition : records.partitions()) {
212 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
213 for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
214 messages.add(partitionRecord.value());
216 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
217 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
219 } catch (Exception e) {
220 logger.error("{}: cannot fetch, throwing exception after sleep...", this);
221 sleepAfterFetchFailure();
227 private void createParentTraceContext(ConsumerRecords<String, String> records) {
228 TraceParentInfo traceParentInfo = new TraceParentInfo();
229 for (ConsumerRecord<String, String> consumerRecord : records) {
231 Headers consumerRecordHeaders = consumerRecord.headers();
232 traceParentInfo = processTraceParentHeader(consumerRecordHeaders);
235 SpanContext spanContext = SpanContext.createFromRemoteParent(
236 traceParentInfo.getTraceId(), traceParentInfo.getSpanId(),
237 TraceFlags.getSampled(), TraceState.builder().build());
239 Context.current().with(Span.wrap(spanContext)).makeCurrent();
242 private TraceParentInfo processTraceParentHeader(Headers headers) {
243 TraceParentInfo traceParentInfo = new TraceParentInfo();
244 if (headers.lastHeader("traceparent") != null) {
245 traceParentInfo.setParentTraceId(new String(headers.lastHeader(
246 "traceparent").value(), StandardCharsets.UTF_8));
248 String[] parts = traceParentInfo.getParentTraceId().split("-");
249 traceParentInfo.setTraceId(parts[1]);
250 traceParentInfo.setSpanId(parts[2]);
253 return traceParentInfo;
258 private static class TraceParentInfo {
259 private String parentTraceId;
260 private String traceId;
261 private String spanId;
265 public void close() {
267 this.consumer.close();
268 logger.info("Kafka Consumer exited {}", this);
272 public String toString() {
273 return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";