2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.common.message.bus.event.kafka;
21 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
22 import java.util.Properties;
23 import java.util.UUID;
24 import org.apache.kafka.clients.producer.KafkaProducer;
25 import org.apache.kafka.clients.producer.Producer;
26 import org.apache.kafka.clients.producer.ProducerConfig;
27 import org.apache.kafka.clients.producer.ProducerRecord;
28 import org.onap.policy.common.message.bus.event.base.BusPublisher;
29 import org.onap.policy.common.parameters.topic.BusTopicParams;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Kafka based library publisher.
36 public class KafkaPublisherWrapper implements BusPublisher {
38 private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
39 private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
41 private final String topic;
46 private final Producer<String, String> producer;
47 protected Properties kafkaProps;
50 * Kafka Publisher Wrapper.
52 * @param busTopicParams topic parameters
54 public KafkaPublisherWrapper(BusTopicParams busTopicParams) {
56 if (busTopicParams.isTopicInvalid()) {
57 throw new IllegalArgumentException("No topic for Kafka");
60 this.topic = busTopicParams.getTopic();
62 // Setup Properties for consumer
63 kafkaProps = new Properties();
64 kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
65 if (busTopicParams.isAdditionalPropsValid()) {
66 kafkaProps.putAll(busTopicParams.getAdditionalProps());
69 if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
70 kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
73 if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
74 kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
77 if (busTopicParams.isAllowTracing()) {
78 kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
79 TracingProducerInterceptor.class.getName());
82 producer = new KafkaProducer<>(kafkaProps);
86 public boolean send(String partitionId, String message) {
87 if (message == null) {
88 throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
93 ProducerRecord<String, String> producerRecord =
94 new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
96 this.producer.send(producerRecord);
98 } catch (Exception e) {
99 logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
106 public void close() {
107 logger.info(LOG_CLOSE, this);
110 this.producer.close();
111 } catch (Exception e) {
112 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
117 public String toString() {
118 return "KafkaPublisherWrapper []";