Fix sonar issues in messagerouter/msgrtr
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / KafkaPublisher.java
index 4bdd9f3..5f616c7 100644 (file)
@@ -26,21 +26,19 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
-
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.json.JSONException;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.util.StringUtils;
 import org.onap.dmaap.dmf.mr.backends.Publisher;
 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
 import org.onap.dmaap.dmf.mr.utils.Utils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.util.StringUtils;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import kafka.common.FailedToSendMessageException;
 
 
 
@@ -84,7 +82,7 @@ public class KafkaPublisher implements Publisher {
                
                
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+               props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
                
                
@@ -100,7 +98,7 @@ public class KafkaPublisher implements Publisher {
         */
        @Override
        public void sendMessage(String topic, message msg) throws IOException{
-               final List<message> msgs = new LinkedList<message>();
+               final List<message> msgs = new LinkedList<>();
                msgs.add(msg);
                sendMessages(topic, msgs);
        }
@@ -168,29 +166,18 @@ public class KafkaPublisher implements Publisher {
                }
        } */
        @Override
-       public void sendMessagesNew(String topic, List<? extends message> msgs)
-                       throws IOException {
-               log.info("sending " + msgs.size() + " events to [" + topic + "]");
-try{
-               final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size());
-                       for (message o : msgs) {
-                       
-                       final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString());
-                       
-               
-               try {
-
-                       fProducer.send(data);
-
-               } catch (Exception excp) {
-                       log.error("Failed to send message(s) to topic [" + topic + "].", excp);
-                       throw new Exception(excp.getMessage(), excp);
-               }
-       }
-               
-       }catch(Exception e){}
-}
-       //private final rrNvReadable fSettings;
+    public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
+        log.info("sending " + msgs.size() + " events to [" + topic + "]");
+        try {
+            for (message o : msgs) {
+                final ProducerRecord<String, String> data =
+                        new ProducerRecord<>(topic, o.getKey(), o.toString());
+                fProducer.send(data);
+            }
+        } catch (Exception e) {
+            log.error("Failed to send message(s) to topic [" + topic + "].", e);
+        }
+    }
 
        
        private Producer<String, String> fProducer;
@@ -203,14 +190,11 @@ try{
    * @param defVal
    */
        private void transferSetting(Properties props, String key, String defVal) {
-               String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
-               if (StringUtils.isEmpty(kafka_prop)) kafka_prop=defVal;
-               //props.put(key, settings.getString("kafka." + key, defVal));
-               props.put(key, kafka_prop);
+               String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
+               if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal;
+               props.put(key, kafkaProp);
        }
 
-       //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);
-
        private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class);
 
        @Override
@@ -218,6 +202,4 @@ try{
                // TODO Auto-generated method stub
                
        }
-
-       
-}
\ No newline at end of file
+}