1 /*******************************************************************************
\r
2 * ============LICENSE_START=======================================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
10 * http://www.apache.org/licenses/LICENSE-2.0
\r
12 * Unless required by applicable law or agreed to in writing, software
\r
13 * distributed under the License is distributed on an "AS IS" BASIS,
\r
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
15 * See the License for the specific language governing permissions and
\r
16 * limitations under the License.
\r
17 * ============LICENSE_END=========================================================
\r
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
21 *******************************************************************************/
\r
23 package com.att.nsa.cambria.embed;
\r
25 import java.io.File;
\r
26 import java.util.Arrays;
\r
27 import java.util.Map;
\r
28 import java.util.Properties;
\r
30 import org.apache.commons.io.FileUtils;
\r
31 import org.apache.curator.framework.CuratorFramework;
\r
33 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
\r
34 import com.att.dmf.mr.backends.kafka.KafkaPublisher;
\r
35 import com.att.dmf.mr.backends.memory.MemoryMetaBroker;
\r
36 import com.att.dmf.mr.backends.memory.MemoryQueue;
\r
37 import org.apache.kafka.clients.admin.AdminClient;
\r
38 import org.apache.kafka.clients.admin.AdminClientConfig;
\r
39 import org.apache.kafka.clients.admin.CreateTopicsResult;
\r
40 import org.apache.kafka.clients.admin.NewTopic;
\r
41 import org.apache.kafka.common.KafkaFuture;
\r
42 import com.att.dmf.mr.beans.DMaaPKafkaConsumerFactory;
\r
43 import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;
\r
44 import com.att.dmf.mr.beans.DMaaPMetricsSet;
\r
45 import com.att.dmf.mr.beans.DMaaPZkClient;
\r
46 import com.att.dmf.mr.beans.DMaaPZkConfigDb;
\r
47 import com.att.dmf.mr.constants.CambriaConstants;
\r
48 import com.att.dmf.mr.security.DMaaPAuthenticator;
\r
49 import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;
\r
50 import com.att.dmf.mr.utils.ConfigurationReader;
\r
51 import com.att.dmf.mr.utils.DMaaPCuratorFactory;
\r
52 import com.att.dmf.mr.utils.PropertyReader;
\r
53 import com.att.nsa.security.db.BaseNsaApiDbImpl;
\r
54 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
\r
55 import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
\r
58 public class EmbedConfigurationReader {
\r
59 private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
\r
60 public static final String TEST_TOPIC = "testTopic";
\r
61 private static final int BROKER_ID = 0;
\r
62 private static final int BROKER_PORT = 5000;
\r
63 private static final String LOCALHOST_BROKER = String.format("localhost:%d", BROKER_PORT);
\r
65 private static final String DEFAULT_ZOOKEEPER_LOG_DIR = "/zookeeper";
\r
66 private static final int ZOOKEEPER_PORT = 2000;
\r
67 private static final String ZOOKEEPER_HOST = String.format("localhost:%d", ZOOKEEPER_PORT);
\r
69 private static final String groupId = "groupID";
\r
71 private AdminClient fKafkaAdminClient;
\r
72 KafkaLocal kafkaLocal;
\r
74 public void setUp() throws Exception {
\r
76 ClassLoader classLoader = getClass().getClassLoader();
\r
77 AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
\r
79 Properties kafkaProperties;
\r
80 Properties zkProperties;
\r
84 dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
\r
85 kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID);
\r
86 zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR);
\r
88 //start kafkaLocalServer
\r
89 kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties);
\r
91 Map<String, String> map = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
\r
92 map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST);
\r
93 map.put("kafka.client.zookeeper", ZOOKEEPER_HOST);
\r
94 map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);
\r
96 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
\r
98 final Properties props = new Properties ();
\r
99 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
\r
100 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret'");
\r
101 props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
\r
102 props.put("sasl.mechanism", "PLAIN");
\r
103 fKafkaAdminClient = AdminClient.create ( props );
\r
105 // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
\r
106 // AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());
\r
107 final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
\r
108 fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
\r
109 Thread.sleep(5000);
\r
110 } catch (Exception e){
\r
111 e.printStackTrace(System.out);
\r
115 private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
\r
116 Properties properties = new Properties();
\r
117 properties.put("port", port + "");
\r
118 properties.put("broker.id", brokerId + "");
\r
119 properties.put("log.dir", logDir);
\r
120 properties.put("zookeeper.connect", ZOOKEEPER_HOST);
\r
121 properties.put("default.replication.factor", "1");
\r
122 properties.put("delete.topic.enable", "true");
\r
123 properties.put("consumer.timeout.ms", -1);
\r
127 private static Properties getZookeeperProperties(int port, String zookeeperDir) {
\r
128 Properties properties = new Properties();
\r
129 properties.put("clientPort", port + "");
\r
130 properties.put("dataDir", zookeeperDir);
\r
134 public void tearDown() throws Exception {
\r
135 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
\r
136 if(fKafkaAdminClient!=null)
\r
137 fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
\r
138 //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
\r
139 //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
\r
140 //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
\r
142 FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));
\r
146 public ConfigurationReader buildConfigurationReader() throws Exception {
\r
150 PropertyReader propertyReader = new PropertyReader();
\r
151 DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
\r
152 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
\r
153 DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
\r
154 CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
\r
155 DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
\r
156 MemoryQueue memoryQueue = new MemoryQueue();
\r
157 MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
\r
158 BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
\r
159 DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
\r
160 KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
\r
161 DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
\r
163 return new ConfigurationReader(propertyReader,
\r
164 dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher,
\r
165 curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker,
\r
166 memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);
\r