539a78c2edc5b01de04a2abf9de6286988b18613
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8  * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import io.opentelemetry.api.trace.Span;
30 import io.opentelemetry.api.trace.SpanContext;
31 import io.opentelemetry.api.trace.TraceFlags;
32 import io.opentelemetry.api.trace.TraceState;
33 import io.opentelemetry.context.Context;
34 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
35 import java.io.IOException;
36 import java.net.MalformedURLException;
37 import java.nio.charset.StandardCharsets;
38 import java.security.GeneralSecurityException;
39 import java.time.Duration;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Properties;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import lombok.Data;
47 import lombok.Getter;
48 import lombok.NoArgsConstructor;
49 import org.apache.commons.lang3.StringUtils;
50 import org.apache.kafka.clients.consumer.ConsumerConfig;
51 import org.apache.kafka.clients.consumer.ConsumerRecord;
52 import org.apache.kafka.clients.consumer.ConsumerRecords;
53 import org.apache.kafka.clients.consumer.KafkaConsumer;
54 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
55 import org.apache.kafka.common.TopicPartition;
56 import org.apache.kafka.common.header.Headers;
57 import org.jetbrains.annotations.NotNull;
58 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  * Wrapper around libraries to consume from message bus.
64  */
65 public interface BusConsumer {
66
67     /**
68      * fetch messages.
69      *
70      * @return list of messages
71      * @throws IOException when error encountered by underlying libraries
72      */
73     public Iterable<String> fetch() throws IOException;
74
75     /**
76      * close underlying library consumer.
77      */
78     public void close();
79
80     /**
81      * Consumer that handles fetch() failures by sleeping.
82      */
83     abstract class FetchingBusConsumer implements BusConsumer {
84         private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
85
86         /**
87          * Fetch timeout.
88          */
89         protected int fetchTimeout;
90
91         /**
92          * Time to sleep on a fetch failure.
93          */
94         @Getter
95         private final int sleepTime;
96
97         /**
98          * Counted down when {@link #close()} is invoked.
99          */
100         private final CountDownLatch closeCondition = new CountDownLatch(1);
101
102
103         /**
104          * Constructs the object.
105          *
106          * @param busTopicParams parameters for the bus topic
107          */
108         protected FetchingBusConsumer(BusTopicParams busTopicParams) {
109             this.fetchTimeout = busTopicParams.getFetchTimeout();
110
111             if (this.fetchTimeout <= 0) {
112                 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
113             } else {
114                 // don't sleep too long, even if fetch timeout is large
115                 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
116             }
117         }
118
119         /**
120          * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
121          * or the thread is interrupted, then this will return immediately.
122          */
123         protected void sleepAfterFetchFailure() {
124             try {
125                 logger.info("{}: backoff for {}ms", this, sleepTime);
126                 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
127                     logger.info("{}: closed while handling fetch error", this);
128                 }
129
130             } catch (InterruptedException e) {
131                 logger.warn("{}: interrupted while handling fetch error", this, e);
132                 Thread.currentThread().interrupt();
133             }
134         }
135
136         @Override
137         public void close() {
138             this.closeCondition.countDown();
139         }
140     }
141
142     /**
143      * Cambria based consumer.
144      */
145     public static class CambriaConsumerWrapper extends FetchingBusConsumer {
146
147         /**
148          * logger.
149          */
150         private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
151
152         /**
153          * Used to build the consumer.
154          */
155         private final ConsumerBuilder builder;
156
157         /**
158          * Cambria client.
159          */
160         private final CambriaConsumer consumer;
161
162         /**
163          * Cambria Consumer Wrapper.
164          * BusTopicParam object contains the following parameters
165          * servers - messaging bus hosts.
166          * topic - topic for messages
167          * apiKey - API Key
168          * apiSecret - API Secret
169          * consumerGroup - Consumer Group
170          * consumerInstance - Consumer Instance
171          * fetchTimeout - Fetch Timeout
172          * fetchLimit - Fetch Limit
173          *
174          * @param busTopicParams - The parameters for the bus topic
175          */
176         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
177             super(busTopicParams);
178
179             this.builder = new CambriaClientBuilders.ConsumerBuilder();
180
181             builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
182                 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
183                 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
184
185             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
186             builder.withSocketTimeout(fetchTimeout + 30000);
187
188             if (busTopicParams.isUseHttps()) {
189                 builder.usingHttps();
190
191                 if (busTopicParams.isAllowSelfSignedCerts()) {
192                     builder.allowSelfSignedCertificates();
193                 }
194             }
195
196             if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
197                 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
198             }
199
200             if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
201                 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
202             }
203
204             try {
205                 this.consumer = builder.build();
206             } catch (MalformedURLException | GeneralSecurityException e) {
207                 throw new IllegalArgumentException(e);
208             }
209         }
210
211         @Override
212         public Iterable<String> fetch() throws IOException {
213             try {
214                 return this.consumer.fetch();
215             } catch (final IOException e) { //NOSONAR
216                 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
217                 sleepAfterFetchFailure();
218                 throw e;
219             }
220         }
221
222         @Override
223         public void close() {
224             super.close();
225             this.consumer.close();
226         }
227
228         @Override
229         public String toString() {
230             return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
231         }
232     }
233
234     /**
235      * Kafka based consumer.
236      */
237     class KafkaConsumerWrapper extends FetchingBusConsumer {
238
239         /**
240          * logger.
241          */
242         private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
243
244         private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
245
246         /**
247          * Kafka consumer.
248          */
249         protected KafkaConsumer<String, String> consumer;
250         protected Properties kafkaProps;
251
252         protected boolean allowTracing;
253
254         /**
255          * Kafka Consumer Wrapper.
256          * BusTopicParam - object contains the following parameters
257          * servers - messaging bus hosts.
258          * topic - topic
259          *
260          * @param busTopicParams - The parameters for the bus topic
261          */
262         public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
263             super(busTopicParams);
264
265             if (busTopicParams.isTopicInvalid()) {
266                 throw new IllegalArgumentException("No topic for Kafka");
267             }
268
269             //Setup Properties for consumer
270             kafkaProps = new Properties();
271             kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
272                 busTopicParams.getServers().get(0));
273
274             if (busTopicParams.isAdditionalPropsValid()) {
275                 kafkaProps.putAll(busTopicParams.getAdditionalProps());
276             }
277
278             if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
279                 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
280             }
281             if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
282                 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
283             }
284             if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
285                 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
286             }
287             if (busTopicParams.isAllowTracing()) {
288                 this.allowTracing = true;
289                 kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
290                         TracingConsumerInterceptor.class.getName());
291             }
292
293             consumer = new KafkaConsumer<>(kafkaProps);
294             //Subscribe to the topic
295             consumer.subscribe(List.of(busTopicParams.getTopic()));
296         }
297
298         @Override
299         public Iterable<String> fetch() {
300             ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
301             if (records == null || records.count() <= 0) {
302                 return Collections.emptyList();
303             }
304             List<String> messages = new ArrayList<>(records.count());
305             try {
306                 if (allowTracing) {
307                     createParentTraceContext(records);
308                 }
309
310                 for (TopicPartition partition : records.partitions()) {
311                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
312                     for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
313                         messages.add(partitionRecord.value());
314                     }
315                     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
316                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
317                 }
318             } catch (Exception e) {
319                 logger.error("{}: cannot fetch, throwing exception after sleep...", this);
320                 sleepAfterFetchFailure();
321                 throw e;
322             }
323             return messages;
324         }
325
326         private void createParentTraceContext(ConsumerRecords<String, String> records) {
327             TraceParentInfo traceParentInfo = new TraceParentInfo();
328             for (ConsumerRecord<String, String> consumerRecord : records) {
329
330                 Headers consumerRecordHeaders = consumerRecord.headers();
331                 traceParentInfo = processTraceParentHeader(consumerRecordHeaders);
332             }
333
334             SpanContext spanContext = SpanContext.createFromRemoteParent(
335                     traceParentInfo.getTraceId(), traceParentInfo.getSpanId(),
336                     TraceFlags.getSampled(), TraceState.builder().build());
337
338             Context.current().with(Span.wrap(spanContext)).makeCurrent();
339         }
340
341         private TraceParentInfo processTraceParentHeader(Headers headers) {
342             TraceParentInfo traceParentInfo = new TraceParentInfo();
343             if (headers.lastHeader("traceparent") != null) {
344                 traceParentInfo.setParentTraceId(new String(headers.lastHeader(
345                         "traceparent").value(), StandardCharsets.UTF_8));
346
347                 String[] parts = traceParentInfo.getParentTraceId().split("-");
348                 traceParentInfo.setTraceId(parts[1]);
349                 traceParentInfo.setSpanId(parts[2]);
350             }
351
352             return traceParentInfo;
353         }
354
355         @Data
356         @NoArgsConstructor
357         private static class TraceParentInfo {
358             private String parentTraceId;
359             private String traceId;
360             private String spanId;
361         }
362
363         @Override
364         public void close() {
365             super.close();
366             this.consumer.close();
367             logger.info("Kafka Consumer exited {}", this);
368         }
369
370         @Override
371         public String toString() {
372             return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
373         }
374     }
375 }
376
377