228664ba881fad5be498ebb8cf93da1d95504051
[dmaap/messagerouter/msgrtr.git] / src / test / java / com / att / nsa / cambria / embed / EmbedConfigurationReader.java
1 /*******************************************************************************\r
2  *  ============LICENSE_START=======================================================\r
3  *  org.onap.dmaap\r
4  *  ================================================================================\r
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  *  ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *  You may obtain a copy of the License at\r
10  *        http://www.apache.org/licenses/LICENSE-2.0\r
11  *  \r
12  *  Unless required by applicable law or agreed to in writing, software\r
13  *  distributed under the License is distributed on an "AS IS" BASIS,\r
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
15  *  See the License for the specific language governing permissions and\r
16  *  limitations under the License.\r
17  *  ============LICENSE_END=========================================================\r
18  *\r
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
20  *  \r
21  *******************************************************************************/\r
22 \r
23 package com.att.nsa.cambria.embed;\r
24 \r
25 import java.io.File;\r
26 import java.util.Arrays;\r
27 import java.util.Map;\r
28 import java.util.Properties;\r
29 \r
30 import org.apache.commons.io.FileUtils;\r
31 import org.apache.curator.framework.CuratorFramework;\r
32 \r
33 import com.att.ajsc.filemonitor.AJSCPropertiesMap;\r
34 import com.att.dmf.mr.backends.kafka.KafkaPublisher;\r
35 import com.att.dmf.mr.backends.memory.MemoryMetaBroker;\r
36 import com.att.dmf.mr.backends.memory.MemoryQueue;\r
37 import org.apache.kafka.clients.admin.AdminClient;\r
38 import org.apache.kafka.clients.admin.AdminClientConfig;\r
39 import org.apache.kafka.clients.admin.CreateTopicsResult;\r
40 import org.apache.kafka.clients.admin.NewTopic;\r
41 import org.apache.kafka.common.KafkaFuture;\r
42 import com.att.dmf.mr.beans.DMaaPKafkaConsumerFactory;\r
43 import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;\r
44 import com.att.dmf.mr.beans.DMaaPMetricsSet;\r
45 import com.att.dmf.mr.beans.DMaaPZkClient;\r
46 import com.att.dmf.mr.beans.DMaaPZkConfigDb;\r
47 import com.att.dmf.mr.constants.CambriaConstants;\r
48 import com.att.dmf.mr.security.DMaaPAuthenticator;\r
49 import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;\r
50 import com.att.dmf.mr.utils.ConfigurationReader;\r
51 import com.att.dmf.mr.utils.DMaaPCuratorFactory;\r
52 import com.att.dmf.mr.utils.PropertyReader;\r
53 import com.att.nsa.security.db.BaseNsaApiDbImpl;\r
54 import com.att.nsa.security.db.simple.NsaSimpleApiKey;\r
55 import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;\r
56 \r
57 \r
58 public class EmbedConfigurationReader {\r
59         private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";\r
60     public static final String TEST_TOPIC = "testTopic";\r
61     private static final int BROKER_ID = 0;\r
62     private static final int BROKER_PORT = 5000;\r
63     private static final String LOCALHOST_BROKER = String.format("localhost:%d", BROKER_PORT);\r
64 \r
65     private static final String DEFAULT_ZOOKEEPER_LOG_DIR = "/zookeeper";\r
66     private static final int ZOOKEEPER_PORT = 2000;\r
67     private static final String ZOOKEEPER_HOST = String.format("localhost:%d", ZOOKEEPER_PORT);\r
68 \r
69     private static final String groupId = "groupID";\r
70     String dir;\r
71     private  AdminClient fKafkaAdminClient;\r
72     KafkaLocal kafkaLocal;\r
73         \r
74         public void setUp() throws Exception {\r
75                 \r
76                 ClassLoader classLoader = getClass().getClassLoader();          \r
77                 AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));\r
78                 \r
79                 Properties kafkaProperties;\r
80         Properties zkProperties;\r
81 \r
82         try {\r
83             //load properties\r
84                 dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();\r
85             kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID);\r
86             zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR);\r
87 \r
88             //start kafkaLocalServer\r
89             kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties);\r
90             \r
91             Map<String, String> map = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);\r
92             map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST);\r
93             map.put("kafka.client.zookeeper", ZOOKEEPER_HOST);\r
94             map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);\r
95             \r
96             DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());\r
97             \r
98             final Properties props = new Properties ();\r
99              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );\r
100              props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret'");\r
101                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            \r
102              props.put("sasl.mechanism", "PLAIN");\r
103              fKafkaAdminClient = AdminClient.create ( props );\r
104              \r
105            // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))\r
106             //  AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());\r
107              final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );\r
108                          fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );\r
109             Thread.sleep(5000);\r
110         } catch (Exception e){\r
111             e.printStackTrace(System.out);\r
112         }       \r
113         }\r
114         \r
115         private static Properties getKafkaProperties(String logDir, int port, int brokerId) {\r
116         Properties properties = new Properties();\r
117         properties.put("port", port + "");\r
118         properties.put("broker.id", brokerId + "");\r
119         properties.put("log.dir", logDir);\r
120         properties.put("zookeeper.connect", ZOOKEEPER_HOST);\r
121         properties.put("default.replication.factor", "1");\r
122         properties.put("delete.topic.enable", "true");\r
123         properties.put("consumer.timeout.ms", -1);\r
124         return properties;\r
125     }\r
126         \r
127         private static Properties getZookeeperProperties(int port, String zookeeperDir) {\r
128         Properties properties = new Properties();\r
129         properties.put("clientPort", port + "");\r
130         properties.put("dataDir", zookeeperDir);\r
131         return properties;\r
132     }\r
133 \r
134         public void tearDown() throws Exception {\r
135                 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());\r
136                 if(fKafkaAdminClient!=null)\r
137                 fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));\r
138                 //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);\r
139                 //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);\r
140                 //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);\r
141                 kafkaLocal.stop();\r
142                 FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));                \r
143         }\r
144 \r
145 \r
146         public ConfigurationReader buildConfigurationReader() throws Exception {\r
147                 \r
148                 setUp();\r
149                 \r
150                 PropertyReader propertyReader = new PropertyReader();\r
151                 DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);\r
152                 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);\r
153                 DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);\r
154                 CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());\r
155                 DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);\r
156                 MemoryQueue memoryQueue = new MemoryQueue();\r
157                 MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);\r
158                 BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());\r
159                 DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);\r
160                 KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);\r
161                 DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);\r
162                 \r
163                 return new ConfigurationReader(propertyReader, \r
164                                 dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher, \r
165                                 curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker, \r
166                                 memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);\r
167                 \r
168         }\r
169 }\r