[MR] Add support for configuring jaas.sasl.config at runtime 06/124706/6
authorseanfos <sean.osullivan@est.tech>
Wed, 6 Oct 2021 15:09:15 +0000 (16:09 +0100)
committerseanfos <sean.osullivan@est.tech>
Mon, 11 Oct 2021 10:08:55 +0000 (11:08 +0100)
Signed-off-by: seanfos <sean.osullivan@est.tech>
Change-Id: I92a6fdb9e375db7b355e19127a5fdbe2b4d2a827
Issue-ID: DMAAP-1653

pom.xml
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java
src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java
src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java

diff --git a/pom.xml b/pom.xml
index 6035e25..12e4b98 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                        <artifactId>slf4j-api</artifactId>
                        <version>1.7.32</version>
                </dependency>
+               <dependency>
+                       <groupId>com.github.stefanbirkner</groupId>
+                       <artifactId>system-rules</artifactId>
+                       <version>1.17.2</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
        <profiles>
                <!-- Use this profile to run the AJSC locally. This profile can be successfully
index 62dc2a5..ac40603 100644 (file)
@@ -8,22 +8,23 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
-*  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *  
+ *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ *
  *******************************************************************************/
 package org.onap.dmaap.dmf.mr.backends.kafka;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -43,9 +44,9 @@ import java.util.Properties;
 
 /**
  * Sends raw JSON objects into Kafka.
- * 
+ *
  * Could improve space: BSON rather than JSON?
- * 
+ *
  * @author peter
  *
  */
@@ -53,7 +54,7 @@ import java.util.Properties;
 public class KafkaPublisher implements Publisher {
        /**
         * constructor initializing
-        * 
+        *
         * @param settings
         * @throws rrNvReadable.missingReqdSetting
         */
@@ -62,35 +63,33 @@ public class KafkaPublisher implements Publisher {
                final Properties props = new Properties();
                String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list");
                if(StringUtils.isEmpty(kafkaConnUrl)){
-                       
+
                        kafkaConnUrl="localhost:9092";
                }
-               
-       
-           if(Utils.isCadiEnabled()){
-               transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
-               transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
-               transferSetting( props, "sasl.mechanism", "PLAIN");     
-           }
+
+
+               if(Utils.isCadiEnabled()){
+                       props.putAll(Utils.addSaslProps());
+               }
                transferSetting( props, "bootstrap.servers",kafkaConnUrl);
-                       
+
                transferSetting( props, "request.required.acks", "1");
                transferSetting( props, "message.send.max.retries", "5");
-               transferSetting(props, "retry.backoff.ms", "150"); 
+               transferSetting(props, "retry.backoff.ms", "150");
+
+
 
-               
-               
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
-               
-               
+
+
                fProducer = new KafkaProducer<>(props);
        }
 
        /**
         * Send a message with a given topic and key.
-        * 
+        *
         * @param msg
         * @throws FailedToSendMessageException
         * @throws JSONException
@@ -102,21 +101,21 @@ public class KafkaPublisher implements Publisher {
                sendMessages(topic, msgs);
        }
 
-       /**  
+       /**
         * method publishing batch messages
-       * This method is commented from 0.8 to 0.11 upgrade
+        * This method is commented from 0.8 to 0.11 upgrade
         * @param topic
         * @param kms
         * throws IOException
         *
        public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
-               try {
-                       fProducer.send(kms);
+       try {
+       fProducer.send(kms);
 
-               } catch (FailedToSendMessageException excp) { 
-                       log.error("Failed to send message(s) to topic [" + topic + "].", excp);
-                       throw new FailedToSendMessageException(excp.getMessage(), excp);
-               }
+       } catch (FailedToSendMessageException excp) {
+       log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+       throw new FailedToSendMessageException(excp.getMessage(), excp);
+       }
 
        } */
 
@@ -131,63 +130,63 @@ public class KafkaPublisher implements Publisher {
                                fProducer.send(km);
                        }
 
-               } catch (Exception excp) { 
+               } catch (Exception excp) {
                        log.error("Failed to send message(s) to topic [" + topic + "].", excp);
                        throw new IOException(excp.getMessage(), excp);
                }
 
        }
-       
+
        /**
         * Send a set of messages. Each must have a "key" string value.
-        * 
+        *
         * @param topic
         * @param msg
         * @throws FailedToSendMessageException
         * @throws JSONException
         *
+        @Override
+        public void sendMessages(String topic, List<? extends message> msgs)
+        throws IOException, FailedToSendMessageException {
+        log.info("sending " + msgs.size() + " events to [" + topic + "]");
+
+        final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
+        for (message o : msgs) {
+        final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
+        kms.add(data);
+        }
+        try {
+        fProducer.send(kms);
+
+        } catch (FailedToSendMessageException excp) {
+        log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+        throw new FailedToSendMessageException(excp.getMessage(), excp);
+        }
+        } */
        @Override
-       public void sendMessages(String topic, List<? extends message> msgs)
-                       throws IOException, FailedToSendMessageException {
+       public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
                log.info("sending " + msgs.size() + " events to [" + topic + "]");
-
-               final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
-               for (message o : msgs) {
-                       final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
-                       kms.add(data);
-               }
                try {
-                       fProducer.send(kms);
-
-               } catch (FailedToSendMessageException excp) {
-                       log.error("Failed to send message(s) to topic [" + topic + "].", excp);
-                       throw new FailedToSendMessageException(excp.getMessage(), excp);
+                       for (message o : msgs) {
+                               final ProducerRecord<String, String> data =
+                                               new ProducerRecord<>(topic, o.getKey(), o.toString());
+                               fProducer.send(data);
+                       }
+               } catch (Exception e) {
+                       log.error("Failed to send message(s) to topic [" + topic + "].", e);
                }
-       } */
-       @Override
-    public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
-        log.info("sending " + msgs.size() + " events to [" + topic + "]");
-        try {
-            for (message o : msgs) {
-                final ProducerRecord<String, String> data =
-                        new ProducerRecord<>(topic, o.getKey(), o.toString());
-                fProducer.send(data);
-            }
-        } catch (Exception e) {
-            log.error("Failed to send message(s) to topic [" + topic + "].", e);
-        }
-    }
-
-       
+       }
+
+
        private Producer<String, String> fProducer;
 
-  /**
-   * It sets the key value pair
-   * @param topic
-   * @param msg 
-   * @param key
-   * @param defVal
-   */
+       /**
+        * It sets the key value pair
+        * @param topic
+        * @param msg
+        * @param key
+        * @param defVal
+        */
        private void transferSetting(Properties props, String key, String defVal) {
                String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
                if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal;
@@ -199,6 +198,6 @@ public class KafkaPublisher implements Publisher {
        @Override
        public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
                // TODO Auto-generated method stub
-               
+
        }
 }
index 26a8cf4..6f5a17c 100644 (file)
@@ -8,16 +8,16 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
-*  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *  
+ *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ *
  *******************************************************************************/
 package org.onap.dmaap.dmf.mr.beans;
 
@@ -27,6 +27,7 @@ import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.onap.dmaap.dmf.mr.CambriaApiException;
 import org.onap.dmaap.dmf.mr.backends.Consumer;
@@ -52,13 +53,13 @@ import java.util.concurrent.TimeUnit;
  */
 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
-       
+
        private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
-       
+
 
        /**
         * constructor initialization
-        * 
+        *
         * @param settings
         * @param metrics
         * @param curator
@@ -68,8 +69,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
         */
 
        public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
-                       @Qualifier("curator") CuratorFramework curator,
-                       @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
+                                                                        @Qualifier("curator") CuratorFramework curator,
+                                                                        @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
                        throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
 
                String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
@@ -97,7 +98,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
                final boolean isCacheEnabled = kSetting_EnableCache;
 
-               
+
                fCache = null;
                if (isCacheEnabled) {
                        fCache = KafkaConsumerCache.getInstance();
@@ -108,15 +109,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
                        fCache.setfApiId(apiNodeId);
                        fCache.startCache(mode, curator);
                        if(kafkaLiveLockAvoider!=null){
-                       kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
-                       fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
+                               kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
+                               fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
                        }
                }
        }
 
        /*
         * getConsumerFor
-        * 
+        *
         * @see
         * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
         * java.lang.String, java.lang.String, int, java.lang.String) This method is
@@ -128,7 +129,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
         */
        @Override
        public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
-                       String remotehost) throws UnavailableException, CambriaApiException {
+                                                                  String remotehost) throws UnavailableException, CambriaApiException {
                Kafka011Consumer kc;
 
                // To synchronize based on the consumer group.
@@ -179,11 +180,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
                                        log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
                                                        + "], on topic [" + topic + "].");
-                                       
+
                                        if (fCache != null) {
                                                fCache.signalOwnership(topic, consumerGroupName, consumerId);
                                        }
-                                       
+
                                        final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
                                        long fCreateTimeMs = System.currentTimeMillis();
                                        KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
@@ -210,7 +211,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
                                                        + consumerId);
                                        log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
                                                        + consumerId);
-                                       
+
                                } finally {
                                        if (locked) {
                                                try {
@@ -242,7 +243,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
                fCache.dropAllConsumers();
        }
 
-       
+
        private KafkaConsumerCache fCache;
        private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
        private String fkafkaBrokers;
@@ -256,38 +257,36 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
        private void transferSettingIfProvided(Properties target, String key, String prefix) {
                String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
 
-               
+
                if (null != keyVal) {
-               
+
                        log.info("Setting [" + key + "] to " + keyVal + ".");
                        target.put(key, keyVal);
                }
        }
 
        /**
-        * Name CreateConsumerconfig  
+        * Name CreateConsumerconfig
         * @param topic
         * @param groupId
         * @param consumerId
         * @return Properties
-        * 
+        *
         * This method is to create Properties required to create kafka connection
-        * Group name is replaced with different format groupid--topic to address same 
-        * groupids for multiple topics. Same groupid with multiple topics 
+        * Group name is replaced with different format groupid--topic to address same
+        * groupids for multiple topics. Same groupid with multiple topics
         * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
         */
        private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
                final Properties props = new Properties();
                //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
-               //Fix for CPFMF-644 : 
+               //Fix for CPFMF-644 :
                final String fakeGroupName = groupId + "--" + topic;
                props.put("group.id", fakeGroupName);
                props.put("enable.auto.commit", "false"); // 0.11
                props.put("bootstrap.servers", fkafkaBrokers);
                if(Utils.isCadiEnabled()){
-               props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
-               props.put("security.protocol", "SASL_PLAINTEXT");
-               props.put("sasl.mechanism", "PLAIN");
+                       props.putAll(Utils.addSaslProps());
                }
                props.put("client.id", consumerId);
 
@@ -314,7 +313,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
        /**
         * putting values in hashmap like consumer timeout, zookeeper time out, etc
-        * 
+        *
         * @param setting
         */
        private static void populateKafkaInternalDefaultsMap() { }
@@ -322,7 +321,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
        /*
         * The starterIncremnt value is just to emulate calling certain consumers,
         * in this test app all the consumers are local
-        * 
+        *
         */
        private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
 
@@ -346,7 +345,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
        @SuppressWarnings("rawtypes")
        @Override
        public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
-                       String remotehost) throws UnavailableException, CambriaApiException {
+                                                                                 String remotehost) throws UnavailableException, CambriaApiException {
                // TODO Auto-generated method stub
                return null;
        }
index 9ab4c83..7a08345 100644 (file)
@@ -8,16 +8,16 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
-*  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *  
+ *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ *
  *******************************************************************************/
 package org.onap.dmaap.dmf.mr.beans;
 
@@ -55,7 +55,7 @@ import java.util.concurrent.ExecutionException;
 
 /**
  * class performing all topic operations
- * 
+ *
  * @author anowarul.islam
  *
  */
@@ -73,32 +73,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                        fkafkaBrokers = "localhost:9092";
                }
-               
-            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-            if(Utils.isCadiEnabled()){
-            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
-                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
-            props.put("sasl.mechanism", "PLAIN");
-            }
-          
-            fKafkaAdminClient=AdminClient.create ( props );
-           
+
+               props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+               if(Utils.isCadiEnabled()){
+                       props.putAll(Utils.addSaslProps());
+               }
+               fKafkaAdminClient=AdminClient.create ( props );
+
        }
 
        private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
        private final AdminClient fKafkaAdminClient;
-       
-       
+
+
 
        /**
         * DMaaPKafkaMetaBroker constructor initializing
-        * 
+        *
         * @param settings
         * @param zk
         * @param configDb
         */
        public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
-                       @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+                                                               @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
@@ -109,30 +106,28 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                        fkafkaBrokers = "localhost:9092";
                }
-               
-                if(Utils.isCadiEnabled()){
-                props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
-                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
-            props.put("sasl.mechanism", "PLAIN");
-                }
-            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-            
-            fKafkaAdminClient=AdminClient.create ( props );
-           
-               
-               
+
+               if(Utils.isCadiEnabled()){
+                       props.putAll(Utils.addSaslProps());
+               }
+               props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+
+               fKafkaAdminClient=AdminClient.create ( props );
+
+
+
        }
-       
+
        public DMaaPKafkaMetaBroker( rrNvReadable settings,
-                       ZkClient zk,  ConfigDb configDb,AdminClient client) {
-               
+                                                                ZkClient zk,  ConfigDb configDb,AdminClient client) {
+
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
-           fKafkaAdminClient= client;
-          
-               
-               
+               fKafkaAdminClient= client;
+
+
+
        }
 
        @Override
@@ -169,7 +164,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
        /**
         * static method get KafkaTopic object
-        * 
+        *
         * @param db
         * @param base
         * @param topic
@@ -185,7 +180,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         */
        @Override
        public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
-                       boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+                                                        boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
                log.info("Creating topic: " + topic);
                try {
                        log.info("Check if topic [" + topic + "] exist.");
@@ -216,23 +211,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                // create via kafka
 
-        try {
-            final NewTopic topicRequest =
-                    new NewTopic(topic, partitions, (short)replicas);
-            final CreateTopicsResult ctr =
-                    fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
-            final KafkaFuture<Void> ctrResult = ctr.all();
-            ctrResult.get();
-            // underlying Kafka topic created. now setup our API info
-            return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
-        } catch (InterruptedException e) {
-            log.warn("Execution of describeTopics timed out.");
-            throw new ConfigDbException(e);
-        } catch (ExecutionException e) {
-            log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
-            throw new ConfigDbException(e.getCause());
-        }
-               
+               try {
+                       final NewTopic topicRequest =
+                                       new NewTopic(topic, partitions, (short)replicas);
+                       final CreateTopicsResult ctr =
+                                       fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+                       final KafkaFuture<Void> ctrResult = ctr.all();
+                       ctrResult.get();
+                       // underlying Kafka topic created. now setup our API info
+                       return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+               } catch (InterruptedException e) {
+                       log.warn("Execution of describeTopics timed out.");
+                       throw new ConfigDbException(e);
+               } catch (ExecutionException e) {
+                       log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
+                       throw new ConfigDbException(e.getCause());
+               }
+
        }
 
        @Override
@@ -240,13 +235,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                log.info("Deleting topic: " + topic);
                try {
                        log.info("Loading zookeeper client for topic deletion.");
-                                       // topic creation. (Otherwise, the topic is only partially created
+                       // topic creation. (Otherwise, the topic is only partially created
                        // in ZK.)
-                       
-                       
+
+
                        fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
                        log.info("Zookeeper client loaded successfully. Deleting topic.");
-                       
+
                } catch (Exception e) {
                        log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
                        throw new ConfigDbException(e);
@@ -265,7 +260,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
        /**
         * method Providing KafkaTopic Object associated with owner and
         * transactionenabled or not
-        * 
+        *
         * @param name
         * @param desc
         * @param owner
@@ -280,7 +275,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
        /**
         * static method giving kafka topic object
-        * 
+        *
         * @param db
         * @param basePath
         * @param name
@@ -291,7 +286,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         * @throws ConfigDbException
         */
        public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
-                       boolean transactionEnabled) throws ConfigDbException {
+                                                                                         boolean transactionEnabled) throws ConfigDbException {
                final JSONObject o = new JSONObject();
                o.put("owner", owner);
                o.put("description", desc);
@@ -303,14 +298,14 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
        /**
         * class performing all user opearation like user is eligible to read,
         * write. permitting a user to write and read,
-        * 
+        *
         * @author anowarul.islam
         *
         */
        public static class KafkaTopic implements Topic {
                /**
                 * constructor initializes
-                * 
+                *
                 * @param name
                 * @param configdb
                 * @param baseTopic
@@ -330,26 +325,26 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        fOwner = o.optString("owner", "");
                        fDesc = o.optString("description", "");
                        fTransactionEnabled = o.optBoolean("txenabled", false);// default
-                                                                                                                                       // value is
-                                                                                                                                       // false
+                       // value is
+                       // false
                        // if this topic has an owner, it needs both read/write ACLs. If there's no
-                                               // owner (or it's empty), null is okay -- this is for existing or implicitly
-                                               // created topics.
-                                               JSONObject readers = o.optJSONObject ( "readers" );
-                                               if ( readers == null && fOwner.length () > 0 )
-                                               {
-                                                   readers = kEmptyAcl;
-                                               }
-                                               fReaders =  fromJson ( readers );
-
-                                               JSONObject writers = o.optJSONObject ( "writers" );
-                                               if ( writers == null && fOwner.length () > 0 )
-                                               {
-                                                   writers = kEmptyAcl;
-                                               }
-                                               fWriters = fromJson ( writers );
+                       // owner (or it's empty), null is okay -- this is for existing or implicitly
+                       // created topics.
+                       JSONObject readers = o.optJSONObject ( "readers" );
+                       if ( readers == null && fOwner.length () > 0 )
+                       {
+                               readers = kEmptyAcl;
+                       }
+                       fReaders =  fromJson ( readers );
+
+                       JSONObject writers = o.optJSONObject ( "writers" );
+                       if ( writers == null && fOwner.length () > 0 )
+                       {
+                               writers = kEmptyAcl;
+                       }
+                       fWriters = fromJson ( writers );
                }
-               
+
                private NsaAcl fromJson(JSONObject o) {
                        NsaAcl acl = new NsaAcl();
                        if (o != null) {
@@ -427,7 +422,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        try
                        {
                                final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-       
+
                                // we have to assume we have current data, or load it again. for the expected use
                                // case, assuming we can overwrite the data is fine.
                                final JSONObject o = new JSONObject ();
@@ -435,15 +430,15 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                                o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
                                o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
                                fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
-                               
+
                                log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-       
+
                        }
                        catch ( ConfigDbException | AccessDeniedException x )
                        {
                                throw x;
                        }
-                       
+
                }
 
                private JSONObject safeSerialize(NsaAcl acl) {
@@ -458,7 +453,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                private final NsaAcl fReaders;
                private final NsaAcl fWriters;
                private boolean fTransactionEnabled;
-       
+
                public boolean isTransactionEnabled() {
                        return fTransactionEnabled;
                }
index 662f0f7..c420072 100644 (file)
@@ -8,30 +8,30 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
-*  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *  
+ *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ *
  *******************************************************************************/
 package org.onap.dmaap.dmf.mr.utils;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
-
-import javax.servlet.http.HttpServletRequest;
 import java.io.IOException;
 import java.io.InputStream;
 import java.security.Principal;
 import java.text.DecimalFormat;
 import java.text.SimpleDateFormat;
 import java.util.*;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
 
 /**
  * This is an utility class for various operations for formatting
@@ -46,13 +46,14 @@ public class Utils {
        private static final String BATCH_ID_FORMAT = "000000";
        private static final String X509_ATTR = "javax.servlet.request.X509Certificate";
        private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class);
+       public static final String SASL_MECH = "sasl.mechanism";
 
        private Utils() {
                super();
        }
 
        /**
-        * Formatting the date 
+        * Formatting the date
         * @param date
         * @return date or null
         */
@@ -128,55 +129,71 @@ public class Utils {
         */
        public static long getSleepMsForRate ( double ratePerMinute )
        {
-               if ( ratePerMinute <= 0.0 ) 
+               if ( ratePerMinute <= 0.0 )
                {
                        return 0;
                }
                return Math.max ( 1000, Math.round ( 60 * 1000 / ratePerMinute ) );
        }
 
-         public static String getRemoteAddress(DMaaPContext ctx)
-         {
-           String reqAddr = ctx.getRequest().getRemoteAddr();
-           String fwdHeader = getFirstHeader("X-Forwarded-For",ctx);
-           return ((fwdHeader != null) ? fwdHeader : reqAddr);
-         }
-         public static String getFirstHeader(String h,DMaaPContext ctx)
-         {
-           List l = getHeader(h,ctx);
-           return ((l.size() > 0) ? (String)l.iterator().next() : null);
-         }
-         public static List<String> getHeader(String h,DMaaPContext ctx)
-         {
-           LinkedList list = new LinkedList();
-           Enumeration e = ctx.getRequest().getHeaders(h);
-           while (e.hasMoreElements())
-           {
-             list.add(e.nextElement().toString());
-           }
-           return list;
-         }
-         
-         public static String getKafkaproperty(){
-                 InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties");
-                       Properties props = new Properties();
-                       try {
-                               props.load(input);
-                       } catch (IOException e) {
-                               log.error("failed to read kafka.properties");
-                       }
-                       return props.getProperty("key");
-                       
-                 
-         }
-         
-         public static boolean isCadiEnabled(){
-                 boolean enableCadi=false;
-                 if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){
-                         enableCadi=true;
-                       }
-                 
-                 return enableCadi;
-         }
-                 
+       public static String getRemoteAddress(DMaaPContext ctx)
+       {
+               String reqAddr = ctx.getRequest().getRemoteAddr();
+               String fwdHeader = getFirstHeader("X-Forwarded-For",ctx);
+               return ((fwdHeader != null) ? fwdHeader : reqAddr);
+       }
+       public static String getFirstHeader(String h,DMaaPContext ctx)
+       {
+               List l = getHeader(h,ctx);
+               return ((l.size() > 0) ? (String)l.iterator().next() : null);
+       }
+       public static List<String> getHeader(String h,DMaaPContext ctx)
+       {
+               LinkedList list = new LinkedList();
+               Enumeration e = ctx.getRequest().getHeaders(h);
+               while (e.hasMoreElements())
+               {
+                       list.add(e.nextElement().toString());
+               }
+               return list;
+       }
+
+       public static String getKafkaproperty(){
+               InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties");
+               Properties props = new Properties();
+               try {
+                       props.load(input);
+               } catch (IOException e) {
+                       log.error("failed to read kafka.properties");
+               }
+               return props.getProperty("key");
+
+
+       }
+
+       public static boolean isCadiEnabled(){
+               boolean enableCadi=false;
+               if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){
+                       enableCadi=true;
+               }
+
+               return enableCadi;
+       }
+
+       public static Properties addSaslProps(){
+               Properties props = new Properties();
+               String saslMech = System.getenv("SASLMECH");
+               if (saslMech != null && saslMech.equals("scram-sha-512")) {
+                       props.put("sasl.jaas.config", System.getenv("JAASLOGIN"));
+                       props.put(SASL_MECH, saslMech.toUpperCase());
+               }
+               else {
+                       props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='" + getKafkaproperty() + "';");
+                       props.put(SASL_MECH, "PLAIN");
+               }
+               props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+               log.info("KafkaAdmin sasl.mechanism set to " + props.getProperty(SASL_MECH));
+               return props;
+
+       }
 }
index 7a0fe78..44047e4 100644 (file)
@@ -44,7 +44,7 @@ public class KafkaPublisherTest {
        public void setUp() throws Exception {
                MockitoAnnotations.initMocks(this);
                PowerMockito.mockStatic(Utils.class);
-               PowerMockito.when(Utils.isCadiEnabled()).thenReturn(true);
+               PowerMockito.when(Utils.isCadiEnabled()).thenReturn(false);
 
        }
 
index f49f615..9d7a931 100644 (file)
@@ -9,7 +9,7 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  ============LICENSE_END=========================================================
  *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ *
  *******************************************************************************/
 
- package org.onap.dmaap.mr.cambria.embed;
+package org.onap.dmaap.mr.cambria.embed;
 
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.nsa.security.db.BaseNsaApiDbImpl;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher;
-import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker;
-import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher;
+import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker;
+import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue;
 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory;
 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
 import org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet;
@@ -49,13 +50,11 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
 import org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory;
 import org.onap.dmaap.dmf.mr.utils.PropertyReader;
-import com.att.nsa.security.db.BaseNsaApiDbImpl;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
+import org.onap.dmaap.dmf.mr.utils.Utils;
 
 
 public class EmbedConfigurationReader {
-       private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
+    private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
     public static final String TEST_TOPIC = "testTopic";
     private static final int BROKER_ID = 0;
     private static final int BROKER_PORT = 5000;
@@ -69,49 +68,49 @@ public class EmbedConfigurationReader {
     String dir;
     private  AdminClient fKafkaAdminClient;
     KafkaLocal kafkaLocal;
-       
-       public void setUp() throws Exception {
-               
-               ClassLoader classLoader = getClass().getClassLoader();          
-               AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
-               
-               Properties kafkaProperties;
+
+    public void setUp() throws Exception {
+
+        ClassLoader classLoader = getClass().getClassLoader();
+        AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
+
+        Properties kafkaProperties;
         Properties zkProperties;
 
         try {
             //load properties
-               dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
+            dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
             kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID);
             zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR);
 
             //start kafkaLocalServer
             kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties);
-            
+
             Map<String, String> map = AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
             map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST);
             map.put("kafka.client.zookeeper", ZOOKEEPER_HOST);
             map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);
-            
+
             DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
-            
+
             final Properties props = new Properties ();
-            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
-            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
-                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
-            props.put("sasl.mechanism", "PLAIN");
-            fKafkaAdminClient = AdminClient.create ( props );
-            
-           // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
+            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
+            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+            props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+            props.put("sasl.mechanism", "PLAIN");
+            fKafkaAdminClient = AdminClient.create ( props );
+
+            // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
             // AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());
-            final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
-                        fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
+            final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
+            fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
             Thread.sleep(5000);
         } catch (Exception e){
             e.printStackTrace(System.out);
-        }      
-       }
-       
-       private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
+        }
+    }
+
+    private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
         Properties properties = new Properties();
         properties.put("port", port + "");
         properties.put("broker.id", brokerId + "");
@@ -122,47 +121,47 @@ public class EmbedConfigurationReader {
         properties.put("consumer.timeout.ms", -1);
         return properties;
     }
-       
-       private static Properties getZookeeperProperties(int port, String zookeeperDir) {
+
+    private static Properties getZookeeperProperties(int port, String zookeeperDir) {
         Properties properties = new Properties();
         properties.put("clientPort", port + "");
         properties.put("dataDir", zookeeperDir);
         return properties;
     }
 
-       public void tearDown() throws Exception {
-               DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
-               if(fKafkaAdminClient!=null)
-               fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
-               //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
-               //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
-               //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
-               kafkaLocal.stop();
-               FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));                
-       }
-
-
-       public ConfigurationReader buildConfigurationReader() throws Exception {
-               
-               setUp();
-               
-               PropertyReader propertyReader = new PropertyReader();
-               DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
-               DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
-               DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
-               CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
-               DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
-               MemoryQueue memoryQueue = new MemoryQueue();
-               MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
-               BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
-               DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
-               KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
-               DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
-               
-               return new ConfigurationReader(propertyReader, 
-                               dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher, 
-                               curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker, 
-                               memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);
-               
-       }
+    public void tearDown() throws Exception {
+        DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
+        if(fKafkaAdminClient!=null)
+            fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
+        //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
+        //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
+        //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
+        kafkaLocal.stop();
+        FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));
+    }
+
+
+    public ConfigurationReader buildConfigurationReader() throws Exception {
+
+        setUp();
+
+        PropertyReader propertyReader = new PropertyReader();
+        DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
+        DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
+        DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
+        CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
+        DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
+        MemoryQueue memoryQueue = new MemoryQueue();
+        MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
+        BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
+        DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
+        KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
+        DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
+
+        return new ConfigurationReader(propertyReader,
+                dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher,
+                curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker,
+                memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);
+
+    }
 }
index 8a4009b..74f6750 100644 (file)
@@ -1,5 +1,5 @@
 /*******************************************************************************
-/*-
+ /*-
  * ============LICENSE_START=======================================================
  * ONAP Policy Engine
  * ================================================================================
@@ -8,9 +8,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
- package org.onap.dmaap.mr.cambria.utils;
+
+package org.onap.dmaap.mr.cambria.utils;
 
 import static org.junit.Assert.*;
 
 import java.security.Principal;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.Properties;
 
 
 import org.apache.http.auth.BasicUserPrincipal;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
 import org.springframework.mock.web.MockHttpServletRequest;
 
+
 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
 import org.onap.dmaap.dmf.mr.utils.Utils;
 
@@ -41,6 +45,9 @@ public class UtilsTest {
 
        private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS";
 
+       @Rule
+       public EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
        @Before
        public void setUp() throws Exception {
        }
@@ -57,33 +64,33 @@ public class UtilsTest {
                String expectedStr = sdf.format(now);
                assertNotNull(dateStr);
                assertTrue("Formatted date does not match - expected [" + expectedStr
-                               + "] received [" + dateStr + "]",
+                                               + "] received [" + dateStr + "]",
                                dateStr.equalsIgnoreCase(expectedStr));
        }
-       
+
        @Test
        public void testgetUserApiKey(){
                MockHttpServletRequest request = new MockHttpServletRequest();
                request.addHeader(Utils.CAMBRIA_AUTH_HEADER, "User:Password");
                assertEquals("User", Utils.getUserApiKey(request));
-               
+
                MockHttpServletRequest request2 = new MockHttpServletRequest();
                Principal principal = new BasicUserPrincipal("User@Test");
                request2.setUserPrincipal(principal);
                request2.addHeader("Authorization", "test");
                assertEquals("User", Utils.getUserApiKey(request2));
-               
+
                MockHttpServletRequest request3 = new MockHttpServletRequest();
                assertNull(Utils.getUserApiKey(request3));
        }
-       
+
        @Test
        public void testgetFromattedBatchSequenceId(){
                Long x = new Long(1234);
                String str = Utils.getFromattedBatchSequenceId(x);
-               assertEquals("001234", str);            
+               assertEquals("001234", str);
        }
-       
+
        @Test
        public void testmessageLengthInBytes(){
                String str = "TestString";
@@ -99,38 +106,58 @@ public class UtilsTest {
                assertNull(Utils.getResponseTransactionId(null));
                assertNull(Utils.getResponseTransactionId(""));
        }
-       
+
        @Test
        public void testgetSleepMsForRate(){
                long x = Utils.getSleepMsForRate(1024.124);
                assertEquals(1000, x);
                assertEquals(0, Utils.getSleepMsForRate(-1));
        }
-       
+
        @Test
        public void testgetRemoteAddress(){
                DMaaPContext dMaapContext = new DMaaPContext();
                MockHttpServletRequest request = new MockHttpServletRequest();
-               
+
                dMaapContext.setRequest(request);
-               
+
                assertEquals(request.getRemoteAddr(), Utils.getRemoteAddress(dMaapContext));
-               
+
                request.addHeader("X-Forwarded-For", "XForward");
                assertEquals("XForward", Utils.getRemoteAddress(dMaapContext));
-               
-               
+
+
        }
-       
+
        @Test
        public void testGetKey(){
                assertNotNull(Utils.getKafkaproperty());
-               
+
        }
-       
+
        @Test
        public void testCadiEnable(){
                assertFalse(Utils.isCadiEnabled());
-               
+
+       }
+
+       @Test
+       public void testaddSaslPropsPlain() {
+               Properties props = new Properties();
+               props.put("security.protocol", "SASL_PLAINTEXT");
+               props.put(Utils.SASL_MECH, "PLAIN");
+               props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+               assertEquals(props, Utils.addSaslProps());
+       }
+
+       @Test
+       public void testaddSaslPropsScram(){
+               Properties props = new Properties();
+               environmentVariables.set("SASLMECH", "scram-sha-512");
+               environmentVariables.set("JAASLOGIN", "org.apache.kafka.common.security.scram.ScramLoginModule required username='onap-dmaap-strimzi-kafka-admin' password='qul6A3TLvidY';");
+               props.put("security.protocol", "SASL_PLAINTEXT");
+               props.put(Utils.SASL_MECH, "SCRAM-SHA-512");
+               props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='onap-dmaap-strimzi-kafka-admin' password='qul6A3TLvidY';");
+               assertEquals(props, Utils.addSaslProps());
        }
 }