[MR] Add support for configuring jaas.sasl.config at runtime
[dmaap/messagerouter/messageservice.git] / src / test / java / org / onap / dmaap / mr / cambria / embed / EmbedConfigurationReader.java
index f49f615..9d7a931 100644 (file)
@@ -9,7 +9,7 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  ============LICENSE_END=========================================================
  *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ *
  *******************************************************************************/
 
- package org.onap.dmaap.mr.cambria.embed;
+package org.onap.dmaap.mr.cambria.embed;
 
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.nsa.security.db.BaseNsaApiDbImpl;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher;
-import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker;
-import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher;
+import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker;
+import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue;
 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory;
 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
 import org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet;
@@ -49,13 +50,11 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
 import org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory;
 import org.onap.dmaap.dmf.mr.utils.PropertyReader;
-import com.att.nsa.security.db.BaseNsaApiDbImpl;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
+import org.onap.dmaap.dmf.mr.utils.Utils;
 
 
 public class EmbedConfigurationReader {
-       private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
+    private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
     public static final String TEST_TOPIC = "testTopic";
     private static final int BROKER_ID = 0;
     private static final int BROKER_PORT = 5000;
@@ -69,49 +68,49 @@ public class EmbedConfigurationReader {
     String dir;
     private  AdminClient fKafkaAdminClient;
     KafkaLocal kafkaLocal;
-       
-       public void setUp() throws Exception {
-               
-               ClassLoader classLoader = getClass().getClassLoader();          
-               AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
-               
-               Properties kafkaProperties;
+
+    public void setUp() throws Exception {
+
+        ClassLoader classLoader = getClass().getClassLoader();
+        AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
+
+        Properties kafkaProperties;
         Properties zkProperties;
 
         try {
             //load properties
-               dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
+            dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
             kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID);
             zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR);
 
             //start kafkaLocalServer
             kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties);
-            
+
             Map<String, String> map = AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
             map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST);
             map.put("kafka.client.zookeeper", ZOOKEEPER_HOST);
             map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);
-            
+
             DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
-            
+
             final Properties props = new Properties ();
-            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
-            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
-                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
-            props.put("sasl.mechanism", "PLAIN");
-            fKafkaAdminClient = AdminClient.create ( props );
-            
-           // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
+            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
+            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+            props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+            props.put("sasl.mechanism", "PLAIN");
+            fKafkaAdminClient = AdminClient.create ( props );
+
+            // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
             // AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());
-            final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
-                        fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
+            final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
+            fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
             Thread.sleep(5000);
         } catch (Exception e){
             e.printStackTrace(System.out);
-        }      
-       }
-       
-       private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
+        }
+    }
+
+    private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
         Properties properties = new Properties();
         properties.put("port", port + "");
         properties.put("broker.id", brokerId + "");
@@ -122,47 +121,47 @@ public class EmbedConfigurationReader {
         properties.put("consumer.timeout.ms", -1);
         return properties;
     }
-       
-       private static Properties getZookeeperProperties(int port, String zookeeperDir) {
+
+    private static Properties getZookeeperProperties(int port, String zookeeperDir) {
         Properties properties = new Properties();
         properties.put("clientPort", port + "");
         properties.put("dataDir", zookeeperDir);
         return properties;
     }
 
-       public void tearDown() throws Exception {
-               DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
-               if(fKafkaAdminClient!=null)
-               fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
-               //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
-               //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
-               //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
-               kafkaLocal.stop();
-               FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));                
-       }
-
-
-       public ConfigurationReader buildConfigurationReader() throws Exception {
-               
-               setUp();
-               
-               PropertyReader propertyReader = new PropertyReader();
-               DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
-               DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
-               DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
-               CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
-               DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
-               MemoryQueue memoryQueue = new MemoryQueue();
-               MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
-               BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
-               DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
-               KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
-               DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
-               
-               return new ConfigurationReader(propertyReader, 
-                               dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher, 
-                               curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker, 
-                               memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);
-               
-       }
+    public void tearDown() throws Exception {
+        DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
+        if(fKafkaAdminClient!=null)
+            fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
+        //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
+        //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
+        //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
+        kafkaLocal.stop();
+        FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));
+    }
+
+
+    public ConfigurationReader buildConfigurationReader() throws Exception {
+
+        setUp();
+
+        PropertyReader propertyReader = new PropertyReader();
+        DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
+        DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
+        DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
+        CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
+        DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
+        MemoryQueue memoryQueue = new MemoryQueue();
+        MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
+        BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
+        DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
+        KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
+        DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
+
+        return new ConfigurationReader(propertyReader,
+                dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher,
+                curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker,
+                memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);
+
+    }
 }