1 /*******************************************************************************
\r
2 * ============LICENSE_START=======================================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
10 * http://www.apache.org/licenses/LICENSE-2.0
\r
12 * Unless required by applicable law or agreed to in writing, software
\r
13 * distributed under the License is distributed on an "AS IS" BASIS,
\r
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
15 * See the License for the specific language governing permissions and
\r
16 * limitations under the License.
\r
17 * ============LICENSE_END=========================================================
\r
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
21 *******************************************************************************/
\r
23 package com.att.nsa.cambria.embed;
\r
25 import java.io.File;
\r
26 import java.util.Map;
\r
27 import java.util.Properties;
\r
29 import org.apache.commons.io.FileUtils;
\r
30 import org.apache.curator.framework.CuratorFramework;
\r
32 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
\r
33 import com.att.nsa.cambria.backends.kafka.KafkaPublisher;
\r
34 import com.att.nsa.cambria.backends.memory.MemoryMetaBroker;
\r
35 import com.att.nsa.cambria.backends.memory.MemoryQueue;
\r
36 import com.att.nsa.cambria.beans.DMaaPKafkaConsumerFactory;
\r
37 import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
\r
38 import com.att.nsa.cambria.beans.DMaaPMetricsSet;
\r
39 import com.att.nsa.cambria.beans.DMaaPZkClient;
\r
40 import com.att.nsa.cambria.beans.DMaaPZkConfigDb;
\r
41 import com.att.nsa.cambria.constants.CambriaConstants;
\r
42 import com.att.nsa.cambria.security.DMaaPAuthenticator;
\r
43 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
\r
44 import com.att.nsa.cambria.utils.ConfigurationReader;
\r
45 import com.att.nsa.cambria.utils.DMaaPCuratorFactory;
\r
46 import com.att.nsa.cambria.utils.PropertyReader;
\r
47 import com.att.nsa.security.db.BaseNsaApiDbImpl;
\r
48 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
\r
49 import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
\r
51 import kafka.admin.AdminUtils;
\r
53 public class EmbedConfigurationReader {
\r
54 private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
\r
55 public static final String TEST_TOPIC = "testTopic";
\r
56 private static final int BROKER_ID = 0;
\r
57 private static final int BROKER_PORT = 5000;
\r
58 private static final String LOCALHOST_BROKER = String.format("localhost:%d", BROKER_PORT);
\r
60 private static final String DEFAULT_ZOOKEEPER_LOG_DIR = "/zookeeper";
\r
61 private static final int ZOOKEEPER_PORT = 2000;
\r
62 private static final String ZOOKEEPER_HOST = String.format("localhost:%d", ZOOKEEPER_PORT);
\r
64 private static final String groupId = "groupID";
\r
67 KafkaLocal kafkaLocal;
\r
69 public void setUp() throws Exception {
\r
71 ClassLoader classLoader = getClass().getClassLoader();
\r
72 AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
\r
74 Properties kafkaProperties;
\r
75 Properties zkProperties;
\r
79 dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
\r
80 kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID);
\r
81 zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR);
\r
83 //start kafkaLocalServer
\r
84 kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties);
\r
86 Map<String, String> map = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
\r
87 map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST);
\r
88 map.put("kafka.client.zookeeper", ZOOKEEPER_HOST);
\r
89 map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);
\r
91 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
\r
92 if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
\r
93 AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());
\r
95 } catch (Exception e){
\r
96 e.printStackTrace(System.out);
\r
100 private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
\r
101 Properties properties = new Properties();
\r
102 properties.put("port", port + "");
\r
103 properties.put("broker.id", brokerId + "");
\r
104 properties.put("log.dir", logDir);
\r
105 properties.put("zookeeper.connect", ZOOKEEPER_HOST);
\r
106 properties.put("default.replication.factor", "1");
\r
107 properties.put("delete.topic.enable", "true");
\r
108 properties.put("consumer.timeout.ms", -1);
\r
112 private static Properties getZookeeperProperties(int port, String zookeeperDir) {
\r
113 Properties properties = new Properties();
\r
114 properties.put("clientPort", port + "");
\r
115 properties.put("dataDir", zookeeperDir);
\r
119 public void tearDown() throws Exception {
\r
120 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
\r
121 AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
\r
122 //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
\r
123 //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
\r
125 FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));
\r
129 public ConfigurationReader buildConfigurationReader() throws Exception {
\r
133 PropertyReader propertyReader = new PropertyReader();
\r
134 DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
\r
135 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
\r
136 DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
\r
137 CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
\r
138 DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(propertyReader, dMaaPMetricsSet, curatorFramework);
\r
139 MemoryQueue memoryQueue = new MemoryQueue();
\r
140 MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
\r
141 BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
\r
142 DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
\r
143 KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
\r
144 DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
\r
146 return new ConfigurationReader(propertyReader,
\r
147 dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher,
\r
148 curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker,
\r
149 memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);
\r