[MR] Add support for configuring jaas.sasl.config at runtime
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / KafkaPublisher.java
index 62dc2a5..ac40603 100644 (file)
@@ -8,22 +8,23 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
-*  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *  
+ *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ *
  *******************************************************************************/
 package org.onap.dmaap.dmf.mr.backends.kafka;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -43,9 +44,9 @@ import java.util.Properties;
 
 /**
  * Sends raw JSON objects into Kafka.
- * 
+ *
  * Could improve space: BSON rather than JSON?
- * 
+ *
  * @author peter
  *
  */
@@ -53,7 +54,7 @@ import java.util.Properties;
 public class KafkaPublisher implements Publisher {
        /**
         * constructor initializing
-        * 
+        *
         * @param settings
         * @throws rrNvReadable.missingReqdSetting
         */
@@ -62,35 +63,33 @@ public class KafkaPublisher implements Publisher {
                final Properties props = new Properties();
                String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list");
                if(StringUtils.isEmpty(kafkaConnUrl)){
-                       
+
                        kafkaConnUrl="localhost:9092";
                }
-               
-       
-           if(Utils.isCadiEnabled()){
-               transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
-               transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
-               transferSetting( props, "sasl.mechanism", "PLAIN");     
-           }
+
+
+               if(Utils.isCadiEnabled()){
+                       props.putAll(Utils.addSaslProps());
+               }
                transferSetting( props, "bootstrap.servers",kafkaConnUrl);
-                       
+
                transferSetting( props, "request.required.acks", "1");
                transferSetting( props, "message.send.max.retries", "5");
-               transferSetting(props, "retry.backoff.ms", "150"); 
+               transferSetting(props, "retry.backoff.ms", "150");
+
+
 
-               
-               
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
-               
-               
+
+
                fProducer = new KafkaProducer<>(props);
        }
 
        /**
         * Send a message with a given topic and key.
-        * 
+        *
         * @param msg
         * @throws FailedToSendMessageException
         * @throws JSONException
@@ -102,21 +101,21 @@ public class KafkaPublisher implements Publisher {
                sendMessages(topic, msgs);
        }
 
-       /**  
+       /**
         * method publishing batch messages
-       * This method is commented from 0.8 to 0.11 upgrade
+        * This method is commented from 0.8 to 0.11 upgrade
         * @param topic
         * @param kms
         * throws IOException
         *
        public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
-               try {
-                       fProducer.send(kms);
+       try {
+       fProducer.send(kms);
 
-               } catch (FailedToSendMessageException excp) { 
-                       log.error("Failed to send message(s) to topic [" + topic + "].", excp);
-                       throw new FailedToSendMessageException(excp.getMessage(), excp);
-               }
+       } catch (FailedToSendMessageException excp) {
+       log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+       throw new FailedToSendMessageException(excp.getMessage(), excp);
+       }
 
        } */
 
@@ -131,63 +130,63 @@ public class KafkaPublisher implements Publisher {
                                fProducer.send(km);
                        }
 
-               } catch (Exception excp) { 
+               } catch (Exception excp) {
                        log.error("Failed to send message(s) to topic [" + topic + "].", excp);
                        throw new IOException(excp.getMessage(), excp);
                }
 
        }
-       
+
        /**
         * Send a set of messages. Each must have a "key" string value.
-        * 
+        *
         * @param topic
         * @param msg
         * @throws FailedToSendMessageException
         * @throws JSONException
         *
+        @Override
+        public void sendMessages(String topic, List<? extends message> msgs)
+        throws IOException, FailedToSendMessageException {
+        log.info("sending " + msgs.size() + " events to [" + topic + "]");
+
+        final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
+        for (message o : msgs) {
+        final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
+        kms.add(data);
+        }
+        try {
+        fProducer.send(kms);
+
+        } catch (FailedToSendMessageException excp) {
+        log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+        throw new FailedToSendMessageException(excp.getMessage(), excp);
+        }
+        } */
        @Override
-       public void sendMessages(String topic, List<? extends message> msgs)
-                       throws IOException, FailedToSendMessageException {
+       public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
                log.info("sending " + msgs.size() + " events to [" + topic + "]");
-
-               final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
-               for (message o : msgs) {
-                       final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
-                       kms.add(data);
-               }
                try {
-                       fProducer.send(kms);
-
-               } catch (FailedToSendMessageException excp) {
-                       log.error("Failed to send message(s) to topic [" + topic + "].", excp);
-                       throw new FailedToSendMessageException(excp.getMessage(), excp);
+                       for (message o : msgs) {
+                               final ProducerRecord<String, String> data =
+                                               new ProducerRecord<>(topic, o.getKey(), o.toString());
+                               fProducer.send(data);
+                       }
+               } catch (Exception e) {
+                       log.error("Failed to send message(s) to topic [" + topic + "].", e);
                }
-       } */
-       @Override
-    public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
-        log.info("sending " + msgs.size() + " events to [" + topic + "]");
-        try {
-            for (message o : msgs) {
-                final ProducerRecord<String, String> data =
-                        new ProducerRecord<>(topic, o.getKey(), o.toString());
-                fProducer.send(data);
-            }
-        } catch (Exception e) {
-            log.error("Failed to send message(s) to topic [" + topic + "].", e);
-        }
-    }
-
-       
+       }
+
+
        private Producer<String, String> fProducer;
 
-  /**
-   * It sets the key value pair
-   * @param topic
-   * @param msg 
-   * @param key
-   * @param defVal
-   */
+       /**
+        * It sets the key value pair
+        * @param topic
+        * @param msg
+        * @param key
+        * @param defVal
+        */
        private void transferSetting(Properties props, String key, String defVal) {
                String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
                if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal;
@@ -199,6 +198,6 @@ public class KafkaPublisher implements Publisher {
        @Override
        public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
                // TODO Auto-generated method stub
-               
+
        }
 }