[KAFKA] Add docker-compose to sample project
[dmaap/kafka11aaf.git] / kafkaClient / src / main / java / org / onap / dmaap / kafka / OnapKafkaClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2022 Nordix Foundation. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dmaap.kafka;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import org.apache.kafka.clients.producer.RecordMetadata;
25 import org.apache.kafka.common.KafkaException;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /**
30  * Utility class that provides a handler for Kafka interactions
31  */
32 public class OnapKafkaClient {
33
34     private final Logger log = LoggerFactory.getLogger(OnapKafkaClient.class.getName());
35
36     private OnapKafkaConsumer onapKafkaConsumer = null;
37
38     private final OnapKafkaProducer onapKafkaProducer;
39
40     public OnapKafkaClient(IKafkaConfig configuration) {
41         if (!configuration.getConsumerTopics().isEmpty()) {
42             onapKafkaConsumer = new OnapKafkaConsumer(configuration);
43             onapKafkaConsumer.subscribeConsumerToTopics();
44         }
45         onapKafkaProducer = new OnapKafkaProducer(configuration);
46     }
47
48     /**
49      * @param topicName The topic from which messages will be fetched
50      * @return A list of messages from a specific topic
51      */
52     public List<String> fetchFromTopic(String topicName) {
53         List<String> messages =  new ArrayList<>();
54         if (onapKafkaConsumer != null) {
55             try {
56                 log.debug("Polling for messages from topic: {}", topicName);
57                 messages = onapKafkaConsumer.poll(topicName);
58                 log.debug("Returning messages from topic {}", topicName);
59                 return messages;
60             } catch (KafkaException e) {
61                 log.error("Failed to fetch from kafka for topic: {}", topicName, e);
62             }
63         } else {
64             log.error("Consumer has not been initialised with the required topic list");
65         }
66         return messages;
67     }
68
69     /**
70      * Publish data to a given topic
71      *  @param topicName The topic to which the message should be published
72      * @param data      The data to publish to the topic specified
73      * @return The RecordMetedata of the request
74      */
75     public RecordMetadata publishToTopic(String topicName, String data) {
76         // Should we check the data size and chunk it if necessary? Do we need to?
77         return onapKafkaProducer.sendDataSynch(topicName, data);
78     }
79 }