[KAFKA] Adding new client code
[dmaap/kafka11aaf.git] / kafkaClient / src / main / java / org / onap / dmaap / kafka / OnapKafkaProducer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * dmaap-kafka-client
4  * ================================================================================
5  * Copyright (C) 2023 Nordix Foundation. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dmaap.kafka;
22
23 import java.util.List;
24 import java.util.Properties;
25 import java.util.UUID;
26 import java.util.concurrent.ExecutionException;
27 import org.apache.kafka.clients.CommonClientConfigs;
28 import org.apache.kafka.clients.producer.KafkaProducer;
29 import org.apache.kafka.clients.producer.ProducerConfig;
30 import org.apache.kafka.clients.producer.ProducerRecord;
31 import org.apache.kafka.clients.producer.RecordMetadata;
32 import org.apache.kafka.common.KafkaException;
33 import org.apache.kafka.common.config.SaslConfigs;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Utility class that provides a KafkaProducer to communicate with a kafka cluster
39  */
40 public class OnapKafkaProducer {
41
42     private final Logger log = LoggerFactory.getLogger(OnapKafkaProducer.class);
43     private final KafkaProducer<String, String> producer;
44     private final List<String> producerTopics;
45
46     /**
47      *
48      * @param configuration The config provided to the client
49      */
50     public OnapKafkaProducer(IKafkaConfig configuration) {
51         producerTopics = configuration.getProducerTopics();
52         log.debug("Instantiating kafka producer for topics {}", producerTopics);
53         Properties props = new Properties();
54
55         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
56         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer");
57         props.put(ProducerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-producer-" + UUID.randomUUID());
58         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig());
59         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getKafkaBootstrapServers());
60         props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig());
61         props.put(SaslConfigs.SASL_MECHANISM, configuration.getKafkaSaslMechanism());
62         props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
63         producer = new KafkaProducer<>(props);
64     }
65
66     /**
67      *
68      * @param topicName The name of the topic to publish the data to
69      * @param value The value of the data
70      * @return The RecordMetedata of the request
71      */
72     public RecordMetadata sendDataSynch(String topicName, String value) {
73         RecordMetadata data = null;
74         try {
75             data = producer.send(new ProducerRecord<>(topicName, value)).get();
76             log.debug("Data sent to topic {} at partition no {} and offset {}", topicName, data.partition(), data.offset());
77         } catch (KafkaException | ExecutionException | InterruptedException e) {
78             log.error("Failed the send data: exc {}", e.getMessage());
79         } finally {
80             producer.flush();
81         }
82         return data;
83     }
84 }