3c57d1ba99ae22fe3b52d0f461dd8bbc31cf25ee
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import io.opentelemetry.api.trace.Span;
30 import io.opentelemetry.api.trace.SpanContext;
31 import io.opentelemetry.api.trace.TraceFlags;
32 import io.opentelemetry.api.trace.TraceState;
33 import io.opentelemetry.context.Context;
34 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
35 import java.io.IOException;
36 import java.net.MalformedURLException;
37 import java.nio.charset.StandardCharsets;
38 import java.security.GeneralSecurityException;
39 import java.time.Duration;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Properties;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import lombok.Data;
47 import lombok.Getter;
48 import lombok.NoArgsConstructor;
49 import org.apache.kafka.clients.consumer.ConsumerConfig;
50 import org.apache.kafka.clients.consumer.ConsumerRecord;
51 import org.apache.kafka.clients.consumer.ConsumerRecords;
52 import org.apache.kafka.clients.consumer.KafkaConsumer;
53 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
54 import org.apache.kafka.common.TopicPartition;
55 import org.apache.kafka.common.header.Headers;
56 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 /**
61  * Wrapper around libraries to consume from message bus.
62  */
63 public interface BusConsumer {
64
65     /**
66      * fetch messages.
67      *
68      * @return list of messages
69      * @throws IOException when error encountered by underlying libraries
70      */
71     public Iterable<String> fetch() throws IOException;
72
73     /**
74      * close underlying library consumer.
75      */
76     public void close();
77
78     /**
79      * Consumer that handles fetch() failures by sleeping.
80      */
81     abstract class FetchingBusConsumer implements BusConsumer {
82         private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
83
84         /**
85          * Fetch timeout.
86          */
87         protected int fetchTimeout;
88
89         /**
90          * Time to sleep on a fetch failure.
91          */
92         @Getter
93         private final int sleepTime;
94
95         /**
96          * Counted down when {@link #close()} is invoked.
97          */
98         private final CountDownLatch closeCondition = new CountDownLatch(1);
99
100
101         /**
102          * Constructs the object.
103          *
104          * @param busTopicParams parameters for the bus topic
105          */
106         protected FetchingBusConsumer(BusTopicParams busTopicParams) {
107             this.fetchTimeout = busTopicParams.getFetchTimeout();
108
109             if (this.fetchTimeout <= 0) {
110                 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
111             } else {
112                 // don't sleep too long, even if fetch timeout is large
113                 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
114             }
115         }
116
117         /**
118          * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
119          * or the thread is interrupted, then this will return immediately.
120          */
121         protected void sleepAfterFetchFailure() {
122             try {
123                 logger.info("{}: backoff for {}ms", this, sleepTime);
124                 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
125                     logger.info("{}: closed while handling fetch error", this);
126                 }
127
128             } catch (InterruptedException e) {
129                 logger.warn("{}: interrupted while handling fetch error", this, e);
130                 Thread.currentThread().interrupt();
131             }
132         }
133
134         @Override
135         public void close() {
136             this.closeCondition.countDown();
137         }
138     }
139
140     /**
141      * Cambria based consumer.
142      */
143     public static class CambriaConsumerWrapper extends FetchingBusConsumer {
144
145         /**
146          * logger.
147          */
148         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
149
150         /**
151          * Used to build the consumer.
152          */
153         private final ConsumerBuilder builder;
154
155         /**
156          * Cambria client.
157          */
158         private final CambriaConsumer consumer;
159
160         /**
161          * Cambria Consumer Wrapper.
162          * BusTopicParam object contains the following parameters
163          * servers - messaging bus hosts.
164          * topic - topic for messages
165          * apiKey - API Key
166          * apiSecret - API Secret
167          * consumerGroup - Consumer Group
168          * consumerInstance - Consumer Instance
169          * fetchTimeout - Fetch Timeout
170          * fetchLimit - Fetch Limit
171          *
172          * @param busTopicParams - The parameters for the bus topic
173          */
174         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
175             super(busTopicParams);
176
177             this.builder = new CambriaClientBuilders.ConsumerBuilder();
178
179             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
180                 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
181                 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
182
183             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
184             builder.withSocketTimeout(fetchTimeout + 30000);
185
186             if (busTopicParams.isUseHttps()) {
187                 builder.usingHttps();
188
189                 if (busTopicParams.isAllowSelfSignedCerts()) {
190                     builder.allowSelfSignedCertificates();
191                 }
192             }
193
194             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
195                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
196             }
197
198             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
199                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
200             }
201
202             try {
203                 this.consumer = builder.build();
204             } catch (MalformedURLException | GeneralSecurityException e) {
205                 throw new IllegalArgumentException(e);
206             }
207         }
208
209         @Override
210         public Iterable<String> fetch() throws IOException {
211             try {
212                 return this.consumer.fetch();
213             } catch (final IOException e) { //NOSONAR
214                 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
215                 sleepAfterFetchFailure();
216                 throw e;
217             }
218         }
219
220         @Override
221         public void close() {
222             super.close();
223             this.consumer.close();
224         }
225
226         @Override
227         public String toString() {
228             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
229         }
230     }
231
232     /**
233      * Kafka based consumer.
234      */
235     class KafkaConsumerWrapper extends FetchingBusConsumer {
236
237         /**
238          * logger.
239          */
240         private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
241
242         private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
243
244         /**
245          * Kafka consumer.
246          */
247         protected KafkaConsumer<String, String> consumer;
248         protected Properties kafkaProps;
249
250         protected boolean allowTracing;
251
252         /**
253          * Kafka Consumer Wrapper.
254          * BusTopicParam - object contains the following parameters
255          * servers - messaging bus hosts.
256          * topic - topic
257          *
258          * @param busTopicParams - The parameters for the bus topic
259          */
260         public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
261             super(busTopicParams);
262
263             if (busTopicParams.isTopicInvalid()) {
264                 throw new IllegalArgumentException("No topic for Kafka");
265             }
266
267             //Setup Properties for consumer
268             kafkaProps = new Properties();
269             kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
270                 busTopicParams.getServers().get(0));
271
272             if (busTopicParams.isAdditionalPropsValid()) {
273                 kafkaProps.putAll(busTopicParams.getAdditionalProps());
274             }
275
276             if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
277                 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
278             }
279             if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
280                 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
281             }
282             if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
283                 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
284             }
285             if (busTopicParams.isAllowTracing()) {
286                 this.allowTracing = true;
287                 kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
288                         TracingConsumerInterceptor.class.getName());
289             }
290
291             consumer = new KafkaConsumer<>(kafkaProps);
292             //Subscribe to the topic
293             consumer.subscribe(List.of(busTopicParams.getTopic()));
294         }
295
296         @Override
297         public Iterable<String> fetch() {
298             ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
299             if (records == null || records.count() <= 0) {
300                 return Collections.emptyList();
301             }
302             List<String> messages = new ArrayList<>(records.count());
303             try {
304                 if (allowTracing) {
305                     createParentTraceContext(records);
306                 }
307
308                 for (TopicPartition partition : records.partitions()) {
309                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
310                     for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
311                         messages.add(partitionRecord.value());
312                     }
313                     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
314                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
315                 }
316             } catch (Exception e) {
317                 logger.error("{}: cannot fetch, throwing exception after sleep...", this);
318                 sleepAfterFetchFailure();
319                 throw e;
320             }
321             return messages;
322         }
323
324         private void createParentTraceContext(ConsumerRecords<String, String> records) {
325             TraceParentInfo traceParentInfo = new TraceParentInfo();
326             for (ConsumerRecord<String, String> consumerRecord : records) {
327
328                 Headers consumerRecordHeaders = consumerRecord.headers();
329                 traceParentInfo = processTraceParentHeader(consumerRecordHeaders);
330             }
331
332             SpanContext spanContext = SpanContext.createFromRemoteParent(
333                     traceParentInfo.getTraceId(), traceParentInfo.getSpanId(),
334                     TraceFlags.getSampled(), TraceState.builder().build());
335
336             Context.current().with(Span.wrap(spanContext)).makeCurrent();
337         }
338
339         private TraceParentInfo processTraceParentHeader(Headers headers) {
340             TraceParentInfo traceParentInfo = new TraceParentInfo();
341             if (headers.lastHeader("traceparent") != null) {
342                 traceParentInfo.setParentTraceId(new String(headers.lastHeader(
343                         "traceparent").value(), StandardCharsets.UTF_8));
344
345                 String[] parts = traceParentInfo.getParentTraceId().split("-");
346                 traceParentInfo.setTraceId(parts[1]);
347                 traceParentInfo.setSpanId(parts[2]);
348             }
349
350             return traceParentInfo;
351         }
352
353         @Data
354         @NoArgsConstructor
355         private static class TraceParentInfo {
356             private String parentTraceId;
357             private String traceId;
358             private String spanId;
359         }
360
361         @Override
362         public void close() {
363             super.close();
364             this.consumer.close();
365             logger.info("Kafka Consumer exited {}", this);
366         }
367
368         @Override
369         public String toString() {
370             return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
371         }
372     }
373 }
374
375