b46c27159c8cae73d3fe26751413bcc2abd7dd7f
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import io.opentelemetry.api.trace.Span;
27 import io.opentelemetry.api.trace.SpanContext;
28 import io.opentelemetry.api.trace.TraceFlags;
29 import io.opentelemetry.api.trace.TraceState;
30 import io.opentelemetry.context.Context;
31 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
32 import java.io.IOException;
33 import java.nio.charset.StandardCharsets;
34 import java.time.Duration;
35 import java.util.ArrayList;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41 import lombok.Data;
42 import lombok.Getter;
43 import lombok.NoArgsConstructor;
44 import org.apache.kafka.clients.consumer.ConsumerConfig;
45 import org.apache.kafka.clients.consumer.ConsumerRecord;
46 import org.apache.kafka.clients.consumer.ConsumerRecords;
47 import org.apache.kafka.clients.consumer.KafkaConsumer;
48 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
49 import org.apache.kafka.common.TopicPartition;
50 import org.apache.kafka.common.header.Headers;
51 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * Wrapper around libraries to consume from message bus.
57  */
58 public interface BusConsumer {
59
60     /**
61      * fetch messages.
62      *
63      * @return list of messages
64      * @throws IOException when error encountered by underlying libraries
65      */
66     public Iterable<String> fetch() throws IOException;
67
68     /**
69      * close underlying library consumer.
70      */
71     public void close();
72
73     /**
74      * Consumer that handles fetch() failures by sleeping.
75      */
76     abstract class FetchingBusConsumer implements BusConsumer {
77         private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
78
79         /**
80          * Fetch timeout.
81          */
82         protected int fetchTimeout;
83
84         /**
85          * Time to sleep on a fetch failure.
86          */
87         @Getter
88         private final int sleepTime;
89
90         /**
91          * Counted down when {@link #close()} is invoked.
92          */
93         private final CountDownLatch closeCondition = new CountDownLatch(1);
94
95
96         /**
97          * Constructs the object.
98          *
99          * @param busTopicParams parameters for the bus topic
100          */
101         protected FetchingBusConsumer(BusTopicParams busTopicParams) {
102             this.fetchTimeout = busTopicParams.getFetchTimeout();
103
104             if (this.fetchTimeout <= 0) {
105                 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
106             } else {
107                 // don't sleep too long, even if fetch timeout is large
108                 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
109             }
110         }
111
112         /**
113          * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
114          * or the thread is interrupted, then this will return immediately.
115          */
116         protected void sleepAfterFetchFailure() {
117             try {
118                 logger.info("{}: backoff for {}ms", this, sleepTime);
119                 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
120                     logger.info("{}: closed while handling fetch error", this);
121                 }
122
123             } catch (InterruptedException e) {
124                 logger.warn("{}: interrupted while handling fetch error", this, e);
125                 Thread.currentThread().interrupt();
126             }
127         }
128
129         @Override
130         public void close() {
131             this.closeCondition.countDown();
132         }
133     }
134
135     /**
136      * Kafka based consumer.
137      */
138     class KafkaConsumerWrapper extends FetchingBusConsumer {
139
140         /**
141          * logger.
142          */
143         private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
144
145         private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
146
147         /**
148          * Kafka consumer.
149          */
150         protected KafkaConsumer<String, String> consumer;
151         protected Properties kafkaProps;
152
153         protected boolean allowTracing;
154
155         /**
156          * Kafka Consumer Wrapper.
157          * BusTopicParam - object contains the following parameters
158          * servers - messaging bus hosts.
159          * topic - topic
160          *
161          * @param busTopicParams - The parameters for the bus topic
162          */
163         public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
164             super(busTopicParams);
165
166             if (busTopicParams.isTopicInvalid()) {
167                 throw new IllegalArgumentException("No topic for Kafka");
168             }
169
170             //Setup Properties for consumer
171             kafkaProps = new Properties();
172             kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
173                 busTopicParams.getServers().get(0));
174
175             if (busTopicParams.isAdditionalPropsValid()) {
176                 kafkaProps.putAll(busTopicParams.getAdditionalProps());
177             }
178
179             if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
180                 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
181             }
182             if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
183                 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
184             }
185             if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
186                 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
187             }
188             if (busTopicParams.isAllowTracing()) {
189                 this.allowTracing = true;
190                 kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
191                         TracingConsumerInterceptor.class.getName());
192             }
193
194             consumer = new KafkaConsumer<>(kafkaProps);
195             //Subscribe to the topic
196             consumer.subscribe(List.of(busTopicParams.getTopic()));
197         }
198
199         @Override
200         public Iterable<String> fetch() {
201             ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
202             if (records == null || records.count() <= 0) {
203                 return Collections.emptyList();
204             }
205             List<String> messages = new ArrayList<>(records.count());
206             try {
207                 if (allowTracing) {
208                     createParentTraceContext(records);
209                 }
210
211                 for (TopicPartition partition : records.partitions()) {
212                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
213                     for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
214                         messages.add(partitionRecord.value());
215                     }
216                     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
217                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
218                 }
219             } catch (Exception e) {
220                 logger.error("{}: cannot fetch, throwing exception after sleep...", this);
221                 sleepAfterFetchFailure();
222                 throw e;
223             }
224             return messages;
225         }
226
227         private void createParentTraceContext(ConsumerRecords<String, String> records) {
228             TraceParentInfo traceParentInfo = new TraceParentInfo();
229             for (ConsumerRecord<String, String> consumerRecord : records) {
230
231                 Headers consumerRecordHeaders = consumerRecord.headers();
232                 traceParentInfo = processTraceParentHeader(consumerRecordHeaders);
233             }
234
235             SpanContext spanContext = SpanContext.createFromRemoteParent(
236                     traceParentInfo.getTraceId(), traceParentInfo.getSpanId(),
237                     TraceFlags.getSampled(), TraceState.builder().build());
238
239             Context.current().with(Span.wrap(spanContext)).makeCurrent();
240         }
241
242         private TraceParentInfo processTraceParentHeader(Headers headers) {
243             TraceParentInfo traceParentInfo = new TraceParentInfo();
244             if (headers.lastHeader("traceparent") != null) {
245                 traceParentInfo.setParentTraceId(new String(headers.lastHeader(
246                         "traceparent").value(), StandardCharsets.UTF_8));
247
248                 String[] parts = traceParentInfo.getParentTraceId().split("-");
249                 traceParentInfo.setTraceId(parts[1]);
250                 traceParentInfo.setSpanId(parts[2]);
251             }
252
253             return traceParentInfo;
254         }
255
256         @Data
257         @NoArgsConstructor
258         private static class TraceParentInfo {
259             private String parentTraceId;
260             private String traceId;
261             private String spanId;
262         }
263
264         @Override
265         public void close() {
266             super.close();
267             this.consumer.close();
268             logger.info("Kafka Consumer exited {}", this);
269         }
270
271         @Override
272         public String toString() {
273             return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
274         }
275     }
276 }
277
278