X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fmessagerouter%2Fmsgrtr.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fbackends%2Fkafka%2FKafkaPublisher.java;h=5f616c74814a6c40dccd3ce7f2b4dd8a871b12c0;hp=4bdd9f333aa1404e2ddcb5776b64c41a2106928d;hb=db3bff2db9380164b21772398c5e67e423a1d58d;hpb=ab3015141a2cb42308f1d09dc3a7edc8c10cf5c7 diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java index 4bdd9f3..5f616c7 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java @@ -26,21 +26,19 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Properties; - import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.json.JSONException; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.util.StringUtils; import org.onap.dmaap.dmf.mr.backends.Publisher; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.utils.Utils; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.util.StringUtils; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable; +import kafka.common.FailedToSendMessageException; @@ -84,7 +82,7 @@ public class KafkaPublisher implements Publisher { props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @@ -100,7 +98,7 @@ public class KafkaPublisher implements Publisher { */ @Override public void sendMessage(String topic, message msg) throws IOException{ - final List msgs = new LinkedList(); + final List msgs = new LinkedList<>(); msgs.add(msg); sendMessages(topic, msgs); } @@ -168,29 +166,18 @@ public class KafkaPublisher implements Publisher { } } */ @Override - public void sendMessagesNew(String topic, List msgs) - throws IOException { - log.info("sending " + msgs.size() + " events to [" + topic + "]"); -try{ - final List> kms = new ArrayList<>(msgs.size()); - for (message o : msgs) { - - final ProducerRecord data = new ProducerRecord<>(topic, o.getKey(), o.toString()); - - - try { - - fProducer.send(data); - - } catch (Exception excp) { - log.error("Failed to send message(s) to topic [" + topic + "].", excp); - throw new Exception(excp.getMessage(), excp); - } - } - - }catch(Exception e){} -} - //private final rrNvReadable fSettings; + public void sendMessagesNew(String topic, List msgs) throws IOException { + log.info("sending " + msgs.size() + " events to [" + topic + "]"); + try { + for (message o : msgs) { + final ProducerRecord data = + new ProducerRecord<>(topic, o.getKey(), o.toString()); + fProducer.send(data); + } + } catch (Exception e) { + log.error("Failed to send message(s) to topic [" + topic + "].", e); + } + } private Producer fProducer; @@ -203,14 +190,11 @@ try{ * @param defVal */ private void transferSetting(Properties props, String key, String defVal) { - String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); - if (StringUtils.isEmpty(kafka_prop)) kafka_prop=defVal; - //props.put(key, settings.getString("kafka." + key, defVal)); - props.put(key, kafka_prop); + String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); + if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal; + props.put(key, kafkaProp); } - //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class); @Override @@ -218,6 +202,4 @@ try{ // TODO Auto-generated method stub } - - -} \ No newline at end of file +}