update the testcases after the kafka 11 changes
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / backends / kafka / KafkaPublisher.java
@@ -8,18 +8,18 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
- *  
+*  
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *
+ *  
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
  *  
  *******************************************************************************/
-package com.att.nsa.cambria.backends.kafka;
+package com.att.dmf.mr.backends.kafka;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -27,28 +27,32 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 
-import kafka.common.FailedToSendMessageException;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.json.JSONException;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+import com.att.dmf.mr.backends.Publisher;
+import com.att.dmf.mr.constants.CambriaConstants;
 //import org.slf4j.Logger;
 //import org.slf4j.LoggerFactory;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.nsa.cambria.backends.Publisher;
-import com.att.nsa.cambria.constants.CambriaConstants;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
 
+//import kafka.FailedToSendMessageException;
+//import kafka.javaapi.producer.Producer;
+//import kafka.producer.KeyedMessage;
+//import kafka.producer.ProducerConfig;
+//import kafka.producer.KeyedMessage;
+
 /**
  * Sends raw JSON objects into Kafka.
  * 
  * Could improve space: BSON rather than JSON?
  * 
- * @author author
+ * @author peter
  *
  */
 
@@ -67,21 +71,32 @@ public class KafkaPublisher implements Publisher {
                transferSetting(fSettings, props, "request.required.acks", "1");
                transferSetting(fSettings, props, "message.send.max.retries", "5");
                transferSetting(fSettings, props, "retry.backoff.ms", "150"); */
-               String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list"); 
-               System.out.println("kafkaConnUrl:- "+kafkaConnUrl);
-               if(null==kafkaConnUrl){ 
-                       kafkaConnUrl="localhost:9092"; 
-               }               
-               transferSetting( props, "metadata.broker.list", kafkaConnUrl);
+               String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list");
+               if(null==kafkaConnUrl){
+                       
+                       kafkaConnUrl="localhost:9092";
+               }
+               //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf";
+        //     props.put("bootstrap.servers", bootSever);
+       //System.setProperty("java.security.auth.login.config",jaaspath);
+       
+               /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+               transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
+               transferSetting( props, "sasl.mechanism", "PLAIN");*/
+               transferSetting( props, "bootstrap.servers",kafkaConnUrl);
+                       //transferSetting( props, "metadata.broker.list", kafkaConnUrl);
                transferSetting( props, "request.required.acks", "1");
                transferSetting( props, "message.send.max.retries", "5");
                transferSetting(props, "retry.backoff.ms", "150"); 
 
-               props.put("serializer.class", "kafka.serializer.StringEncoder");
+               //props.put("serializer.class", "kafka.serializer.StringEncoder");
+               
+               props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
-               fConfig = new ProducerConfig(props);
-               fProducer = new Producer<String, String>(fConfig);
+               //fConfig = new ProducerConfig(props);
+               //fProducer = new Producer<String, String>(fConfig);
+               fProducer = new KafkaProducer<>(props);
        }
 
        /**
@@ -92,19 +107,19 @@ public class KafkaPublisher implements Publisher {
         * @throws JSONException
         */
        @Override
-       public void sendMessage(String topic, message msg) throws IOException, FailedToSendMessageException {
+       public void sendMessage(String topic, message msg) throws IOException{
                final List<message> msgs = new LinkedList<message>();
                msgs.add(msg);
                sendMessages(topic, msgs);
        }
 
-       /**
+       /**  
         * method publishing batch messages
-        * 
+       * This method is commented from 0.8 to 0.11 upgrade
         * @param topic
         * @param kms
         * throws IOException
-        */
+        *
        public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
                try {
                        fProducer.send(kms);
@@ -114,8 +129,26 @@ public class KafkaPublisher implements Publisher {
                        throw new FailedToSendMessageException(excp.getMessage(), excp);
                }
 
-       }
+       } */
+
 
+       /*
+        * Kafka 11.0 Interface
+        * @see com.att.nsa.cambria.backends.Publisher#sendBatchMessageNew(java.lang.String, java.util.ArrayList)
+        */
+       public void sendBatchMessageNew(String topic, ArrayList <ProducerRecord<String,String>> kms) throws IOException {
+               try {
+                       for (ProducerRecord<String,String> km : kms) {
+                               fProducer.send(km);
+                       }
+
+               } catch (Exception excp) { 
+                       log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+                       throw new IOException(excp.getMessage(), excp);
+               }
+
+       }
+       
        /**
         * Send a set of messages. Each must have a "key" string value.
         * 
@@ -123,7 +156,7 @@ public class KafkaPublisher implements Publisher {
         * @param msg
         * @throws FailedToSendMessageException
         * @throws JSONException
-        */
+        *
        @Override
        public void sendMessages(String topic, List<? extends message> msgs)
                        throws IOException, FailedToSendMessageException {
@@ -141,11 +174,33 @@ public class KafkaPublisher implements Publisher {
                        log.error("Failed to send message(s) to topic [" + topic + "].", excp);
                        throw new FailedToSendMessageException(excp.getMessage(), excp);
                }
-       }
+       } */
+       @Override
+       public void sendMessagesNew(String topic, List<? extends message> msgs)
+                       throws IOException {
+               log.info("sending " + msgs.size() + " events to [" + topic + "]");
+try{
+               final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size());
+                       for (message o : msgs) {
+                       
+                       final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString());
+                       //kms.add(data);
+               
+               try {
+
+                       fProducer.send(data);
 
+               } catch (Exception excp) {
+                       log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+                       throw new Exception(excp.getMessage(), excp);
+               }
+       }
+               
+       }catch(Exception e){}
+}
        //private final rrNvReadable fSettings;
 
-       private ProducerConfig fConfig;
+       //private ProducerConfig fConfig;
        private Producer<String, String> fProducer;
 
   /**
@@ -165,4 +220,16 @@ public class KafkaPublisher implements Publisher {
        //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);
 
        private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class);
-}
+
+       @Override
+       public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
+               // TODO Auto-generated method stub
+               
+       }
+
+       //@Override
+       //public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
+               // TODO Auto-generated method stub
+               
+       //}
+}
\ No newline at end of file