DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / test / java / org / onap / dmaap / mr / cambria / embed / EmbedConfigurationReader.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22
23  package org.onap.dmaap.mr.cambria.embed;
24
25 import java.io.File;
26 import java.util.Arrays;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.apache.commons.io.FileUtils;
31 import org.apache.curator.framework.CuratorFramework;
32
33 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
34 import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher;
35 import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker;
36 import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue;
37 import org.apache.kafka.clients.admin.AdminClient;
38 import org.apache.kafka.clients.admin.AdminClientConfig;
39 import org.apache.kafka.clients.admin.NewTopic;
40 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory;
41 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
42 import org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet;
43 import org.onap.dmaap.dmf.mr.beans.DMaaPZkClient;
44 import org.onap.dmaap.dmf.mr.beans.DMaaPZkConfigDb;
45 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
46 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
47 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
48 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
49 import org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory;
50 import org.onap.dmaap.dmf.mr.utils.PropertyReader;
51 import com.att.nsa.security.db.BaseNsaApiDbImpl;
52 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
53 import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
54
55
56 public class EmbedConfigurationReader {
57         private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
58     public static final String TEST_TOPIC = "testTopic";
59     private static final int BROKER_ID = 0;
60     private static final int BROKER_PORT = 5000;
61     private static final String LOCALHOST_BROKER = String.format("localhost:%d", BROKER_PORT);
62
63     private static final String DEFAULT_ZOOKEEPER_LOG_DIR = "/zookeeper";
64     private static final int ZOOKEEPER_PORT = 2000;
65     private static final String ZOOKEEPER_HOST = String.format("localhost:%d", ZOOKEEPER_PORT);
66
67     private static final String groupId = "groupID";
68     String dir;
69     private  AdminClient fKafkaAdminClient;
70     KafkaLocal kafkaLocal;
71         
72         public void setUp() throws Exception {
73                 
74                 ClassLoader classLoader = getClass().getClassLoader();          
75                 AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
76                 
77                 Properties kafkaProperties;
78         Properties zkProperties;
79
80         try {
81             //load properties
82                 dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
83             kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID);
84             zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR);
85
86             //start kafkaLocalServer
87             kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties);
88             
89             Map<String, String> map = AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
90             map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST);
91             map.put("kafka.client.zookeeper", ZOOKEEPER_HOST);
92             map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);
93             
94             DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
95             
96             final Properties props = new Properties ();
97              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
98              props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret'");
99                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
100              props.put("sasl.mechanism", "PLAIN");
101              fKafkaAdminClient = AdminClient.create ( props );
102              
103            // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
104             //  AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());
105              final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
106                          fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
107             Thread.sleep(5000);
108         } catch (Exception e){
109             e.printStackTrace(System.out);
110         }       
111         }
112         
113         private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
114         Properties properties = new Properties();
115         properties.put("port", port + "");
116         properties.put("broker.id", brokerId + "");
117         properties.put("log.dir", logDir);
118         properties.put("zookeeper.connect", ZOOKEEPER_HOST);
119         properties.put("default.replication.factor", "1");
120         properties.put("delete.topic.enable", "true");
121         properties.put("consumer.timeout.ms", -1);
122         return properties;
123     }
124         
125         private static Properties getZookeeperProperties(int port, String zookeeperDir) {
126         Properties properties = new Properties();
127         properties.put("clientPort", port + "");
128         properties.put("dataDir", zookeeperDir);
129         return properties;
130     }
131
132         public void tearDown() throws Exception {
133                 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
134                 if(fKafkaAdminClient!=null)
135                 fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
136                 //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
137                 //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
138                 //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
139                 kafkaLocal.stop();
140                 FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));                
141         }
142
143
144         public ConfigurationReader buildConfigurationReader() throws Exception {
145                 
146                 setUp();
147                 
148                 PropertyReader propertyReader = new PropertyReader();
149                 DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
150                 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
151                 DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
152                 CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
153                 DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
154                 MemoryQueue memoryQueue = new MemoryQueue();
155                 MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
156                 BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
157                 DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
158                 KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
159                 DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
160                 
161                 return new ConfigurationReader(propertyReader, 
162                                 dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher, 
163                                 curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker, 
164                                 memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);
165                 
166         }
167 }