1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
23 package org.onap.dmaap.mr.cambria.embed;
26 import java.util.Arrays;
28 import java.util.Properties;
30 import org.apache.commons.io.FileUtils;
31 import org.apache.curator.framework.CuratorFramework;
33 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
34 import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher;
35 import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker;
36 import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue;
37 import org.apache.kafka.clients.admin.AdminClient;
38 import org.apache.kafka.clients.admin.AdminClientConfig;
39 import org.apache.kafka.clients.admin.CreateTopicsResult;
40 import org.apache.kafka.clients.admin.NewTopic;
41 import org.apache.kafka.common.KafkaFuture;
42 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory;
43 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
44 import org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet;
45 import org.onap.dmaap.dmf.mr.beans.DMaaPZkClient;
46 import org.onap.dmaap.dmf.mr.beans.DMaaPZkConfigDb;
47 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
48 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
49 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
50 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
51 import org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory;
52 import org.onap.dmaap.dmf.mr.utils.PropertyReader;
53 import com.att.nsa.security.db.BaseNsaApiDbImpl;
54 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
55 import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
58 public class EmbedConfigurationReader {
59 private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
60 public static final String TEST_TOPIC = "testTopic";
61 private static final int BROKER_ID = 0;
62 private static final int BROKER_PORT = 5000;
63 private static final String LOCALHOST_BROKER = String.format("localhost:%d", BROKER_PORT);
65 private static final String DEFAULT_ZOOKEEPER_LOG_DIR = "/zookeeper";
66 private static final int ZOOKEEPER_PORT = 2000;
67 private static final String ZOOKEEPER_HOST = String.format("localhost:%d", ZOOKEEPER_PORT);
69 private static final String groupId = "groupID";
71 private AdminClient fKafkaAdminClient;
72 KafkaLocal kafkaLocal;
74 public void setUp() throws Exception {
76 ClassLoader classLoader = getClass().getClassLoader();
77 AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
79 Properties kafkaProperties;
80 Properties zkProperties;
84 dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
85 kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID);
86 zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR);
88 //start kafkaLocalServer
89 kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties);
91 Map<String, String> map = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
92 map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST);
93 map.put("kafka.client.zookeeper", ZOOKEEPER_HOST);
94 map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);
96 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
98 final Properties props = new Properties ();
99 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
100 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret'");
101 props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
102 props.put("sasl.mechanism", "PLAIN");
103 fKafkaAdminClient = AdminClient.create ( props );
105 // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
106 // AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());
107 final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
108 fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
110 } catch (Exception e){
111 e.printStackTrace(System.out);
115 private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
116 Properties properties = new Properties();
117 properties.put("port", port + "");
118 properties.put("broker.id", brokerId + "");
119 properties.put("log.dir", logDir);
120 properties.put("zookeeper.connect", ZOOKEEPER_HOST);
121 properties.put("default.replication.factor", "1");
122 properties.put("delete.topic.enable", "true");
123 properties.put("consumer.timeout.ms", -1);
127 private static Properties getZookeeperProperties(int port, String zookeeperDir) {
128 Properties properties = new Properties();
129 properties.put("clientPort", port + "");
130 properties.put("dataDir", zookeeperDir);
134 public void tearDown() throws Exception {
135 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
136 if(fKafkaAdminClient!=null)
137 fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
138 //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
139 //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
140 //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
142 FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));
146 public ConfigurationReader buildConfigurationReader() throws Exception {
150 PropertyReader propertyReader = new PropertyReader();
151 DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
152 DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
153 DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
154 CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
155 DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
156 MemoryQueue memoryQueue = new MemoryQueue();
157 MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
158 BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
159 DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
160 KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
161 DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
163 return new ConfigurationReader(propertyReader,
164 dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher,
165 curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker,
166 memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);