update the package name
[dmaap/messagerouter/msgrtr.git] / src / test / java / org / onap / dmaap / mr / cambria / embed / EmbedConfigurationReader.java
-/*******************************************************************************\r
- *  ============LICENSE_START=======================================================\r
- *  org.onap.dmaap\r
- *  ================================================================================\r
- *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- *  ================================================================================\r
- *  Licensed under the Apache License, Version 2.0 (the "License");\r
- *  you may not use this file except in compliance with the License.\r
- *  You may obtain a copy of the License at\r
- *        http://www.apache.org/licenses/LICENSE-2.0\r
- *  \r
- *  Unless required by applicable law or agreed to in writing, software\r
- *  distributed under the License is distributed on an "AS IS" BASIS,\r
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- *  See the License for the specific language governing permissions and\r
- *  limitations under the License.\r
- *  ============LICENSE_END=========================================================\r
- *\r
- *  ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
- *  \r
- *******************************************************************************/\r
-\r
-package com.att.nsa.cambria.embed;\r
-\r
-import java.io.File;\r
-import java.util.Arrays;\r
-import java.util.Map;\r
-import java.util.Properties;\r
-\r
-import org.apache.commons.io.FileUtils;\r
-import org.apache.curator.framework.CuratorFramework;\r
-\r
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;\r
-import com.att.dmf.mr.backends.kafka.KafkaPublisher;\r
-import com.att.dmf.mr.backends.memory.MemoryMetaBroker;\r
-import com.att.dmf.mr.backends.memory.MemoryQueue;\r
-import org.apache.kafka.clients.admin.AdminClient;\r
-import org.apache.kafka.clients.admin.AdminClientConfig;\r
-import org.apache.kafka.clients.admin.CreateTopicsResult;\r
-import org.apache.kafka.clients.admin.NewTopic;\r
-import org.apache.kafka.common.KafkaFuture;\r
-import com.att.dmf.mr.beans.DMaaPKafkaConsumerFactory;\r
-import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;\r
-import com.att.dmf.mr.beans.DMaaPMetricsSet;\r
-import com.att.dmf.mr.beans.DMaaPZkClient;\r
-import com.att.dmf.mr.beans.DMaaPZkConfigDb;\r
-import com.att.dmf.mr.constants.CambriaConstants;\r
-import com.att.dmf.mr.security.DMaaPAuthenticator;\r
-import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;\r
-import com.att.dmf.mr.utils.ConfigurationReader;\r
-import com.att.dmf.mr.utils.DMaaPCuratorFactory;\r
-import com.att.dmf.mr.utils.PropertyReader;\r
-import com.att.nsa.security.db.BaseNsaApiDbImpl;\r
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;\r
-import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;\r
-\r
-\r
-public class EmbedConfigurationReader {\r
-       private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";\r
-    public static final String TEST_TOPIC = "testTopic";\r
-    private static final int BROKER_ID = 0;\r
-    private static final int BROKER_PORT = 5000;\r
-    private static final String LOCALHOST_BROKER = String.format("localhost:%d", BROKER_PORT);\r
-\r
-    private static final String DEFAULT_ZOOKEEPER_LOG_DIR = "/zookeeper";\r
-    private static final int ZOOKEEPER_PORT = 2000;\r
-    private static final String ZOOKEEPER_HOST = String.format("localhost:%d", ZOOKEEPER_PORT);\r
-\r
-    private static final String groupId = "groupID";\r
-    String dir;\r
-    private  AdminClient fKafkaAdminClient;\r
-    KafkaLocal kafkaLocal;\r
-       \r
-       public void setUp() throws Exception {\r
-               \r
-               ClassLoader classLoader = getClass().getClassLoader();          \r
-               AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));\r
-               \r
-               Properties kafkaProperties;\r
-        Properties zkProperties;\r
-\r
-        try {\r
-            //load properties\r
-               dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();\r
-            kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID);\r
-            zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR);\r
-\r
-            //start kafkaLocalServer\r
-            kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties);\r
-            \r
-            Map<String, String> map = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);\r
-            map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST);\r
-            map.put("kafka.client.zookeeper", ZOOKEEPER_HOST);\r
-            map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);\r
-            \r
-            DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());\r
-            \r
-            final Properties props = new Properties ();\r
-            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );\r
-            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret'");\r
-                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            \r
-            props.put("sasl.mechanism", "PLAIN");\r
-            fKafkaAdminClient = AdminClient.create ( props );\r
-            \r
-           // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))\r
-            // AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());\r
-            final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );\r
-                        fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );\r
-            Thread.sleep(5000);\r
-        } catch (Exception e){\r
-            e.printStackTrace(System.out);\r
-        }      \r
-       }\r
-       \r
-       private static Properties getKafkaProperties(String logDir, int port, int brokerId) {\r
-        Properties properties = new Properties();\r
-        properties.put("port", port + "");\r
-        properties.put("broker.id", brokerId + "");\r
-        properties.put("log.dir", logDir);\r
-        properties.put("zookeeper.connect", ZOOKEEPER_HOST);\r
-        properties.put("default.replication.factor", "1");\r
-        properties.put("delete.topic.enable", "true");\r
-        properties.put("consumer.timeout.ms", -1);\r
-        return properties;\r
-    }\r
-       \r
-       private static Properties getZookeeperProperties(int port, String zookeeperDir) {\r
-        Properties properties = new Properties();\r
-        properties.put("clientPort", port + "");\r
-        properties.put("dataDir", zookeeperDir);\r
-        return properties;\r
-    }\r
-\r
-       public void tearDown() throws Exception {\r
-               DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());\r
-               if(fKafkaAdminClient!=null)\r
-               fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));\r
-               //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);\r
-               //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);\r
-               //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);\r
-               kafkaLocal.stop();\r
-               FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));                \r
-       }\r
-\r
-\r
-       public ConfigurationReader buildConfigurationReader() throws Exception {\r
-               \r
-               setUp();\r
-               \r
-               PropertyReader propertyReader = new PropertyReader();\r
-               DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);\r
-               DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);\r
-               DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);\r
-               CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());\r
-               DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);\r
-               MemoryQueue memoryQueue = new MemoryQueue();\r
-               MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);\r
-               BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());\r
-               DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);\r
-               KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);\r
-               DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);\r
-               \r
-               return new ConfigurationReader(propertyReader, \r
-                               dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher, \r
-                               curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker, \r
-                               memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);\r
-               \r
-       }\r
-}\r
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+
+ package org.onap.dmaap.mr.cambria.embed;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.framework.CuratorFramework;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher;
+import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker;
+import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory;
+import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
+import org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet;
+import org.onap.dmaap.dmf.mr.beans.DMaaPZkClient;
+import org.onap.dmaap.dmf.mr.beans.DMaaPZkConfigDb;
+import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
+import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
+import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
+import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
+import org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory;
+import org.onap.dmaap.dmf.mr.utils.PropertyReader;
+import com.att.nsa.security.db.BaseNsaApiDbImpl;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
+
+
+public class EmbedConfigurationReader {
+       private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
+    public static final String TEST_TOPIC = "testTopic";
+    private static final int BROKER_ID = 0;
+    private static final int BROKER_PORT = 5000;
+    private static final String LOCALHOST_BROKER = String.format("localhost:%d", BROKER_PORT);
+
+    private static final String DEFAULT_ZOOKEEPER_LOG_DIR = "/zookeeper";
+    private static final int ZOOKEEPER_PORT = 2000;
+    private static final String ZOOKEEPER_HOST = String.format("localhost:%d", ZOOKEEPER_PORT);
+
+    private static final String groupId = "groupID";
+    String dir;
+    private  AdminClient fKafkaAdminClient;
+    KafkaLocal kafkaLocal;
+       
+       public void setUp() throws Exception {
+               
+               ClassLoader classLoader = getClass().getClassLoader();          
+               AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
+               
+               Properties kafkaProperties;
+        Properties zkProperties;
+
+        try {
+            //load properties
+               dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
+            kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID);
+            zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR);
+
+            //start kafkaLocalServer
+            kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties);
+            
+            Map<String, String> map = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
+            map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST);
+            map.put("kafka.client.zookeeper", ZOOKEEPER_HOST);
+            map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);
+            
+            DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
+            
+            final Properties props = new Properties ();
+            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
+            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret'");
+                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
+            props.put("sasl.mechanism", "PLAIN");
+            fKafkaAdminClient = AdminClient.create ( props );
+            
+           // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
+            // AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());
+            final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
+                        fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
+            Thread.sleep(5000);
+        } catch (Exception e){
+            e.printStackTrace(System.out);
+        }      
+       }
+       
+       private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
+        Properties properties = new Properties();
+        properties.put("port", port + "");
+        properties.put("broker.id", brokerId + "");
+        properties.put("log.dir", logDir);
+        properties.put("zookeeper.connect", ZOOKEEPER_HOST);
+        properties.put("default.replication.factor", "1");
+        properties.put("delete.topic.enable", "true");
+        properties.put("consumer.timeout.ms", -1);
+        return properties;
+    }
+       
+       private static Properties getZookeeperProperties(int port, String zookeeperDir) {
+        Properties properties = new Properties();
+        properties.put("clientPort", port + "");
+        properties.put("dataDir", zookeeperDir);
+        return properties;
+    }
+
+       public void tearDown() throws Exception {
+               DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
+               if(fKafkaAdminClient!=null)
+               fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
+               //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
+               //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
+               //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
+               kafkaLocal.stop();
+               FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));                
+       }
+
+
+       public ConfigurationReader buildConfigurationReader() throws Exception {
+               
+               setUp();
+               
+               PropertyReader propertyReader = new PropertyReader();
+               DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
+               DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
+               DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
+               CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
+               DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
+               MemoryQueue memoryQueue = new MemoryQueue();
+               MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
+               BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
+               DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
+               KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
+               DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
+               
+               return new ConfigurationReader(propertyReader, 
+                               dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher, 
+                               curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker, 
+                               memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);
+               
+       }
+}