2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8 * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import io.opentelemetry.api.trace.Span;
30 import io.opentelemetry.api.trace.SpanContext;
31 import io.opentelemetry.api.trace.TraceFlags;
32 import io.opentelemetry.api.trace.TraceState;
33 import io.opentelemetry.context.Context;
34 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
35 import java.io.IOException;
36 import java.net.MalformedURLException;
37 import java.nio.charset.StandardCharsets;
38 import java.security.GeneralSecurityException;
39 import java.time.Duration;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Properties;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
48 import lombok.NoArgsConstructor;
49 import org.apache.commons.lang3.StringUtils;
50 import org.apache.kafka.clients.consumer.ConsumerConfig;
51 import org.apache.kafka.clients.consumer.ConsumerRecord;
52 import org.apache.kafka.clients.consumer.ConsumerRecords;
53 import org.apache.kafka.clients.consumer.KafkaConsumer;
54 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
55 import org.apache.kafka.common.TopicPartition;
56 import org.apache.kafka.common.header.Headers;
57 import org.jetbrains.annotations.NotNull;
58 import org.onap.dmaap.mr.client.MRClientFactory;
59 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
60 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
61 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
62 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
63 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
68 * Wrapper around libraries to consume from message bus.
70 public interface BusConsumer {
75 * @return list of messages
76 * @throws IOException when error encountered by underlying libraries
78 public Iterable<String> fetch() throws IOException;
81 * close underlying library consumer.
86 * Consumer that handles fetch() failures by sleeping.
88 abstract class FetchingBusConsumer implements BusConsumer {
89 private static final Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
94 protected int fetchTimeout;
97 * Time to sleep on a fetch failure.
100 private final int sleepTime;
103 * Counted down when {@link #close()} is invoked.
105 private final CountDownLatch closeCondition = new CountDownLatch(1);
109 * Constructs the object.
111 * @param busTopicParams parameters for the bus topic
113 protected FetchingBusConsumer(BusTopicParams busTopicParams) {
114 this.fetchTimeout = busTopicParams.getFetchTimeout();
116 if (this.fetchTimeout <= 0) {
117 this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
119 // don't sleep too long, even if fetch timeout is large
120 this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
125 * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
126 * or the thread is interrupted, then this will return immediately.
128 protected void sleepAfterFetchFailure() {
130 logger.info("{}: backoff for {}ms", this, sleepTime);
131 if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
132 logger.info("{}: closed while handling fetch error", this);
135 } catch (InterruptedException e) {
136 logger.warn("{}: interrupted while handling fetch error", this, e);
137 Thread.currentThread().interrupt();
142 public void close() {
143 this.closeCondition.countDown();
148 * Cambria based consumer.
150 public static class CambriaConsumerWrapper extends FetchingBusConsumer {
155 private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
158 * Used to build the consumer.
160 private final ConsumerBuilder builder;
165 private final CambriaConsumer consumer;
168 * Cambria Consumer Wrapper.
169 * BusTopicParam object contains the following parameters
170 * servers - messaging bus hosts.
171 * topic - topic for messages
173 * apiSecret - API Secret
174 * consumerGroup - Consumer Group
175 * consumerInstance - Consumer Instance
176 * fetchTimeout - Fetch Timeout
177 * fetchLimit - Fetch Limit
179 * @param busTopicParams - The parameters for the bus topic
181 public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
182 super(busTopicParams);
184 this.builder = new CambriaClientBuilders.ConsumerBuilder();
186 builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
187 .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
188 .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
190 // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
191 builder.withSocketTimeout(fetchTimeout + 30000);
193 if (busTopicParams.isUseHttps()) {
194 builder.usingHttps();
196 if (busTopicParams.isAllowSelfSignedCerts()) {
197 builder.allowSelfSignedCertificates();
201 if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
202 builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
205 if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
206 builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
210 this.consumer = builder.build();
211 } catch (MalformedURLException | GeneralSecurityException e) {
212 throw new IllegalArgumentException(e);
217 public Iterable<String> fetch() throws IOException {
219 return this.consumer.fetch();
220 } catch (final IOException e) { //NOSONAR
221 logger.error("{}: cannot fetch because of {}", this, e.getMessage());
222 sleepAfterFetchFailure();
228 public void close() {
230 this.consumer.close();
234 public String toString() {
235 return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
240 * Kafka based consumer.
242 class KafkaConsumerWrapper extends FetchingBusConsumer {
247 private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
249 private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
254 protected KafkaConsumer<String, String> consumer;
255 protected Properties kafkaProps;
257 protected boolean allowTracing;
260 * Kafka Consumer Wrapper.
261 * BusTopicParam - object contains the following parameters
262 * servers - messaging bus hosts.
265 * @param busTopicParams - The parameters for the bus topic
267 public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
268 super(busTopicParams);
270 if (busTopicParams.isTopicInvalid()) {
271 throw new IllegalArgumentException("No topic for Kafka");
274 //Setup Properties for consumer
275 kafkaProps = new Properties();
276 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
277 busTopicParams.getServers().get(0));
279 if (busTopicParams.isAdditionalPropsValid()) {
280 kafkaProps.putAll(busTopicParams.getAdditionalProps());
283 if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
284 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
286 if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
287 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
289 if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
290 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
292 if (busTopicParams.isAllowTracing()) {
293 this.allowTracing = true;
294 kafkaProps.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
295 TracingConsumerInterceptor.class.getName());
298 consumer = new KafkaConsumer<>(kafkaProps);
299 //Subscribe to the topic
300 consumer.subscribe(List.of(busTopicParams.getTopic()));
304 public Iterable<String> fetch() {
305 ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
306 if (records == null || records.count() <= 0) {
307 return Collections.emptyList();
309 List<String> messages = new ArrayList<>(records.count());
312 createParentTraceContext(records);
315 for (TopicPartition partition : records.partitions()) {
316 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
317 for (ConsumerRecord<String, String> partitionRecord : partitionRecords) {
318 messages.add(partitionRecord.value());
320 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
321 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
323 } catch (Exception e) {
324 logger.error("{}: cannot fetch, throwing exception after sleep...", this);
325 sleepAfterFetchFailure();
331 private void createParentTraceContext(ConsumerRecords<String, String> records) {
332 TraceParentInfo traceParentInfo = new TraceParentInfo();
333 for (ConsumerRecord<String, String> consumerRecord : records) {
335 Headers consumerRecordHeaders = consumerRecord.headers();
336 traceParentInfo = processTraceParentHeader(consumerRecordHeaders);
339 SpanContext spanContext = SpanContext.createFromRemoteParent(
340 traceParentInfo.getTraceId(), traceParentInfo.getSpanId(),
341 TraceFlags.getSampled(), TraceState.builder().build());
343 Context.current().with(Span.wrap(spanContext)).makeCurrent();
346 private TraceParentInfo processTraceParentHeader(Headers headers) {
347 TraceParentInfo traceParentInfo = new TraceParentInfo();
348 if (headers.lastHeader("traceparent") != null) {
349 traceParentInfo.setParentTraceId(new String(headers.lastHeader(
350 "traceparent").value(), StandardCharsets.UTF_8));
352 String[] parts = traceParentInfo.getParentTraceId().split("-");
353 traceParentInfo.setTraceId(parts[1]);
354 traceParentInfo.setSpanId(parts[2]);
357 return traceParentInfo;
362 private static class TraceParentInfo {
363 private String parentTraceId;
364 private String traceId;
365 private String spanId;
369 public void close() {
371 this.consumer.close();
372 logger.info("Kafka Consumer exited {}", this);
376 public String toString() {
377 return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
384 public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
389 private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
392 * Name of the "protocol" property.
394 protected static final String PROTOCOL_PROP = "Protocol";
399 protected MRConsumerImpl consumer;
402 * MR Consumer Wrapper.
404 * <p>servers - messaging bus hosts
407 * apiSecret - API Secret
408 * username - AAF Login
409 * password - AAF Password
410 * consumerGroup - Consumer Group
411 * consumerInstance - Consumer Instance
412 * fetchTimeout - Fetch Timeout
413 * fetchLimit - Fetch Limit
415 * @param busTopicParams contains above listed attributes
416 * @throws MalformedURLException URL should be valid
418 protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
419 super(busTopicParams);
421 if (busTopicParams.isTopicInvalid()) {
422 throw new IllegalArgumentException("No topic for DMaaP");
425 this.consumer = new MRConsumerImplBuilder()
426 .setHostPart(busTopicParams.getServers())
427 .setTopic(busTopicParams.getTopic())
428 .setConsumerGroup(busTopicParams.getConsumerGroup())
429 .setConsumerId(busTopicParams.getConsumerInstance())
430 .setTimeoutMs(busTopicParams.getFetchTimeout())
431 .setLimit(busTopicParams.getFetchLimit())
432 .setApiKey(busTopicParams.getApiKey())
433 .setApiSecret(busTopicParams.getApiSecret())
434 .createMRConsumerImpl();
436 this.consumer.setUsername(busTopicParams.getUserName());
437 this.consumer.setPassword(busTopicParams.getPassword());
441 public Iterable<String> fetch() {
442 final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
443 if (response == null) {
444 logger.warn("{}: DMaaP NULL response received", this);
446 sleepAfterFetchFailure();
447 return new ArrayList<>();
449 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
450 response.getResponseMessage());
452 if (!"200".equals(response.getResponseCode())) {
454 logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
455 response.getResponseMessage());
457 sleepAfterFetchFailure();
463 if (response.getActualMessages() == null) {
464 return new ArrayList<>();
466 return response.getActualMessages();
471 public void close() {
473 this.consumer.close();
477 public String toString() {
478 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
479 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
480 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
481 + consumer.getUsername() + "]";
488 class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
490 private static final Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
493 * BusTopicParams contain the following parameters.
494 * MR Consumer Wrapper.
496 * <p>servers messaging bus hosts
499 * apiSecret - API Secret
500 * aafLogin - AAF Login
501 * aafPassword - AAF Password
502 * consumerGroup - Consumer Group
503 * consumerInstance - Consumer Instance
504 * fetchTimeout - Fetch Timeout
505 * fetchLimit - Fetch Limit
507 * @param busTopicParams contains above listed params
508 * @throws MalformedURLException URL should be valid
510 public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
512 super(busTopicParams);
514 // super constructor sets servers = {""} if empty to avoid errors when using DME2
515 if (busTopicParams.isServersInvalid()) {
516 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
519 this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
521 Properties props = new Properties();
523 if (busTopicParams.isUseHttps()) {
524 props.setProperty(PROTOCOL_PROP, "https");
525 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
528 props.setProperty(PROTOCOL_PROP, "http");
529 this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
532 this.consumer.setProps(props);
533 logger.info("{}: CREATION", this);
537 public String toString() {
538 final MRConsumerImpl consumer = this.consumer;
540 return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
541 + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + consumer.getHost()
542 + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + ", consumer.getUsername()="
543 + consumer.getUsername() + "]";
547 class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
549 private static final Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
554 * @param busTopicParams topic parameters
555 * @throws MalformedURLException must provide a valid URL
557 public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
560 super(busTopicParams);
563 final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
564 ? busTopicParams.getAdditionalProps().get(
565 PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
568 BusHelper.validateBusTopicParams(busTopicParams, PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
570 if ((busTopicParams.isPartnerInvalid())
571 && StringUtils.isBlank(dme2RouteOffer)) {
572 throw new IllegalArgumentException(
573 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
574 + "." + busTopicParams.getTopic()
575 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
576 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
577 + busTopicParams.getTopic()
578 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
581 final String serviceName = busTopicParams.getServers().get(0);
583 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
585 this.consumer.setUsername(busTopicParams.getUserName());
586 this.consumer.setPassword(busTopicParams.getPassword());
588 Properties props = getProperties(busTopicParams, serviceName, dme2RouteOffer);
590 MRClientFactory.prop = props;
591 this.consumer.setProps(props);
593 logger.info("{}: CREATION", this);
597 private static Properties getProperties(BusTopicParams busTopicParams, String serviceName,
598 String dme2RouteOffer) {
599 Properties props = new Properties();
601 props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
603 props.setProperty("username", busTopicParams.getUserName());
604 props.setProperty("password", busTopicParams.getPassword());
606 /* These are required, no defaults */
607 props.setProperty("topic", busTopicParams.getTopic());
609 BusHelper.setCommonProperties(busTopicParams, dme2RouteOffer, props);
611 props.setProperty("MethodType", "GET");
613 if (busTopicParams.isUseHttps()) {
614 props.setProperty(PROTOCOL_PROP, "https");
617 props.setProperty(PROTOCOL_PROP, "http");
620 props.setProperty("contenttype", "application/json");
622 if (busTopicParams.isAdditionalPropsValid()) {
623 props.putAll(busTopicParams.getAdditionalProps());