86b9e936686747925058b2c15298f6880c09573b
[policy/clamp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2024 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.common.message.bus.event.kafka;
20
21 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
22 import java.util.Properties;
23 import java.util.UUID;
24 import org.apache.kafka.clients.producer.KafkaProducer;
25 import org.apache.kafka.clients.producer.Producer;
26 import org.apache.kafka.clients.producer.ProducerConfig;
27 import org.apache.kafka.clients.producer.ProducerRecord;
28 import org.onap.policy.common.message.bus.event.base.BusPublisher;
29 import org.onap.policy.common.parameters.topic.BusTopicParams;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Kafka based library publisher.
35  */
36 public class KafkaPublisherWrapper implements BusPublisher {
37
38     private static final Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
39     private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
40
41     private final String topic;
42
43     /**
44      * Kafka publisher.
45      */
46     private final Producer<String, String> producer;
47     protected Properties kafkaProps;
48
49     /**
50      * Kafka Publisher Wrapper.
51      *
52      * @param busTopicParams topic parameters
53      */
54     public KafkaPublisherWrapper(BusTopicParams busTopicParams) {
55
56         if (busTopicParams.isTopicInvalid()) {
57             throw new IllegalArgumentException("No topic for Kafka");
58         }
59
60         this.topic = busTopicParams.getTopic();
61
62         // Setup Properties for consumer
63         kafkaProps = new Properties();
64         kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
65         if (busTopicParams.isAdditionalPropsValid()) {
66             kafkaProps.putAll(busTopicParams.getAdditionalProps());
67         }
68
69         if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
70             kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
71         }
72
73         if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
74             kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
75         }
76
77         if (busTopicParams.isAllowTracing()) {
78             kafkaProps.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
79                 TracingProducerInterceptor.class.getName());
80         }
81
82         producer = new KafkaProducer<>(kafkaProps);
83     }
84
85     @Override
86     public boolean send(String partitionId, String message) {
87         if (message == null) {
88             throw new IllegalArgumentException(NO_MESSAGE_PROVIDED);
89         }
90
91         try {
92             // Create the record
93             ProducerRecord<String, String> producerRecord =
94                 new ProducerRecord<>(topic, UUID.randomUUID().toString(), message);
95
96             this.producer.send(producerRecord);
97             producer.flush();
98         } catch (Exception e) {
99             logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
100             return false;
101         }
102         return true;
103     }
104
105     @Override
106     public void close() {
107         logger.info(LOG_CLOSE, this);
108
109         try {
110             this.producer.close();
111         } catch (Exception e) {
112             logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
113         }
114     }
115
116     @Override
117     public String toString() {
118         return "KafkaPublisherWrapper []";
119     }
120
121 }