2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020,2023 Bell Canada. All rights reserved.
8 * Modifications Copyright (C) 2022-2024 Nordix Foundation.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.policy.common.endpoints.event.comm.bus.internal;
26 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
27 import java.util.Properties;
28 import java.util.UUID;
29 import org.apache.kafka.clients.producer.KafkaProducer;
30 import org.apache.kafka.clients.producer.Producer;
31 import org.apache.kafka.clients.producer.ProducerConfig;
32 import org.apache.kafka.clients.producer.ProducerRecord;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 public interface BusPublisher {
38 String NO_MESSAGE_PROVIDED = "No message provided";
39 String LOG_CLOSE = "{}: CLOSE";
40 String LOG_CLOSE_FAILED = "{}: CLOSE FAILED";
45 * @param partitionId id
46 * @param message the message
47 * @return true if success, false otherwise
48 * @throws IllegalArgumentException if no message provided
50 boolean send(String partitionId, String message);
53 * closes the publisher.
58 * Kafka based library publisher.
60 class KafkaPublisherWrapper implements BusPublisher {
62 private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
63 private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
65 private final String topic;
70 private final Producer<String, String> producer;
71 protected Properties kafkaProps;
74 * Kafka Publisher Wrapper.
76 * @param busTopicParams topic parameters
78 protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
80 if (busTopicParams.isTopicInvalid()) {
81 throw new IllegalArgumentException("No topic for Kafka");
84 this.topic = busTopicParams.getTopic();
86 // Setup Properties for consumer
87 kafkaProps = new Properties();
88 kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
89 if (busTopicParams.isAdditionalPropsValid()) {
90 kafkaProps.putAll(busTopicParams.getAdditionalProps());
92 if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
93 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
95 if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
96 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
99 if (busTopicParams.isAllowTracing()) {
100 kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
101 TracingProducerInterceptor.class.getName());
104 producer = new KafkaProducer<>(kafkaProps);
108 public boolean send(String partitionId, String message) {
109 if (message == null) {
110 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
115 ProducerRecord<String, String> producerRecord =
116 new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
118 this.producer.send(producerRecord);
120 } catch (Exception e) {
121 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
128 public void close() {
129 logger.info(LOG_CLOSE, this);
132 this.producer.close();
133 } catch (Exception e) {
134 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
139 public String toString() {
140 return "KafkaPublisherWrapper []";