2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8 * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import io.opentelemetry.api.trace.Span;
30 import io.opentelemetry.api.trace.SpanContext;
31 import io.opentelemetry.api.trace.TraceFlags;
32 import io.opentelemetry.api.trace.TraceState;
33 import io.opentelemetry.context.Context;
34 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
35 import java.io.IOException;
36 import java.net.MalformedURLException;
37 import java.nio.charset.StandardCharsets;
38 import java.security.GeneralSecurityException;
39 import java.time.Duration;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Properties;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
48 import lombok.NoArgsConstructor;
49 import org.apache.commons.lang3.StringUtils;
50 import org.apache.kafka.clients.consumer.ConsumerConfig;
51 import org.apache.kafka.clients.consumer.ConsumerRecord;
52 import org.apache.kafka.clients.consumer.ConsumerRecords;
53 import org.apache.kafka.clients.consumer.KafkaConsumer;
54 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
55 import org.apache.kafka.common.TopicPartition;
56 import org.apache.kafka.common.header.Headers;
57 import org.jetbrains.annotations.NotNull;
58 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
63 * Wrapper around libraries to consume from message bus.
65 public interface BusConsumer {
70 * @return list of messages
71 * @throws IOException when error encountered by underlying libraries
73 public Iterable<String> fetch() throws IOException;
76 * close underlying library consumer.
81 * Consumer that handles fetch() failures by sleeping.
83 abstract class FetchingBusConsumer implements BusConsumer {
84 private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
89 protected int fetchTimeout;
92 * Time to sleep on a fetch failure.
95 private final int sleepTime;
98 * Counted down when {@link #close()} is invoked.
100 private final CountDownLatch closeCondition = new CountDownLatch(1);
104 * Constructs the object.
106 * @param busTopicParams parameters for the bus topic
108 protected FetchingBusConsumer(BusTopicParams busTopicParams) {
109 this.fetchTimeout = busTopicParams.getFetchTimeout();
111 if (this.fetchTimeout <= 0) {
112 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
114 // don't sleep too long, even if fetch timeout is large
115 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
120 * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
121 * or the thread is interrupted, then this will return immediately.
123 protected void sleepAfterFetchFailure() {
125 logger.info("{}: backoff for {}ms", this, sleepTime);
126 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
127 logger.info("{}: closed while handling fetch error", this);
130 } catch (InterruptedException e) {
131 logger.warn("{}: interrupted while handling fetch error", this, e);
132 Thread.currentThread().interrupt();
137 public void close() {
138 this.closeCondition.countDown();
143 * Cambria based consumer.
145 public static class CambriaConsumerWrapper extends FetchingBusConsumer {
150 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
153 * Used to build the consumer.
155 private final ConsumerBuilder builder;
160 private final CambriaConsumer consumer;
163 * Cambria Consumer Wrapper.
164 * BusTopicParam object contains the following parameters
165 * servers - messaging bus hosts.
166 * topic - topic for messages
168 * apiSecret - API Secret
169 * consumerGroup - Consumer Group
170 * consumerInstance - Consumer Instance
171 * fetchTimeout - Fetch Timeout
172 * fetchLimit - Fetch Limit
174 * @param busTopicParams - The parameters for the bus topic
176 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
177 super(busTopicParams);
179 this.builder = new CambriaClientBuilders.ConsumerBuilder();
181 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
182 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
183 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
185 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
186 builder.withSocketTimeout(fetchTimeout + 30000);
188 if (busTopicParams.isUseHttps()) {
189 builder.usingHttps();
191 if (busTopicParams.isAllowSelfSignedCerts()) {
192 builder.allowSelfSignedCertificates();
196 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
197 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
200 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
201 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
205 this.consumer = builder.build();
206 } catch (MalformedURLException | GeneralSecurityException e) {
207 throw new IllegalArgumentException(e);
212 public Iterable<String> fetch() throws IOException {
214 return this.consumer.fetch();
215 } catch (final IOException e) { //NOSONAR
216 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
217 sleepAfterFetchFailure();
223 public void close() {
225 this.consumer.close();
229 public String toString() {
230 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
235 * Kafka based consumer.
237 class KafkaConsumerWrapper extends FetchingBusConsumer {
242 private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
244 private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
249 protected KafkaConsumer<String, String> consumer;
250 protected Properties kafkaProps;
252 protected boolean allowTracing;
255 * Kafka Consumer Wrapper.
256 * BusTopicParam - object contains the following parameters
257 * servers - messaging bus hosts.
260 * @param busTopicParams - The parameters for the bus topic
262 public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
263 super(busTopicParams);
265 if (busTopicParams.isTopicInvalid()) {
266 throw new IllegalArgumentException("No topic for Kafka");
269 //Setup Properties for consumer
270 kafkaProps = new Properties();
271 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
272 busTopicParams.getServers().get(0));
274 if (busTopicParams.isAdditionalPropsValid()) {
275 kafkaProps.putAll(busTopicParams.getAdditionalProps());
278 if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
279 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
281 if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
282 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
284 if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
285 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
287 if (busTopicParams.isAllowTracing()) {
288 this.allowTracing = true;
289 kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
290 TracingConsumerInterceptor.class.getName());
293 consumer = new KafkaConsumer<>(kafkaProps);
294 //Subscribe to the topic
295 consumer.subscribe(List.of(busTopicParams.getTopic()));
299 public Iterable<String> fetch() {
300 ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
301 if (records == null || records.count() <= 0) {
302 return Collections.emptyList();
304 List<String> messages = new ArrayList<>(records.count());
307 createParentTraceContext(records);
310 for (TopicPartition partition : records.partitions()) {
311 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
312 for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
313 messages.add(partitionRecord.value());
315 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
316 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
318 } catch (Exception e) {
319 logger.error("{}: cannot fetch, throwing exception after sleep...", this);
320 sleepAfterFetchFailure();
326 private void createParentTraceContext(ConsumerRecords<String, String> records) {
327 TraceParentInfo traceParentInfo = new TraceParentInfo();
328 for (ConsumerRecord<String, String> consumerRecord : records) {
330 Headers consumerRecordHeaders = consumerRecord.headers();
331 traceParentInfo = processTraceParentHeader(consumerRecordHeaders);
334 SpanContext spanContext = SpanContext.createFromRemoteParent(
335 traceParentInfo.getTraceId(), traceParentInfo.getSpanId(),
336 TraceFlags.getSampled(), TraceState.builder().build());
338 Context.current().with(Span.wrap(spanContext)).makeCurrent();
341 private TraceParentInfo processTraceParentHeader(Headers headers) {
342 TraceParentInfo traceParentInfo = new TraceParentInfo();
343 if (headers.lastHeader("traceparent") != null) {
344 traceParentInfo.setParentTraceId(new String(headers.lastHeader(
345 "traceparent").value(), StandardCharsets.UTF_8));
347 String[] parts = traceParentInfo.getParentTraceId().split("-");
348 traceParentInfo.setTraceId(parts[1]);
349 traceParentInfo.setSpanId(parts[2]);
352 return traceParentInfo;
357 private static class TraceParentInfo {
358 private String parentTraceId;
359 private String traceId;
360 private String spanId;
364 public void close() {
366 this.consumer.close();
367 logger.info("Kafka Consumer exited {}", this);
371 public String toString() {
372 return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";