1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.dmf.mr.beans;
24 import java.util.Arrays;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Properties;
29 import java.util.TreeSet;
30 import java.util.concurrent.ExecutionException;
32 import org.I0Itec.zkclient.ZkClient;
33 import org.I0Itec.zkclient.exception.ZkNoNodeException;
34 import org.apache.kafka.clients.admin.AdminClient;
35 import org.apache.kafka.clients.admin.AdminClientConfig;
36 import org.apache.kafka.clients.admin.CreateTopicsResult;
37 import org.apache.kafka.clients.admin.NewTopic;
38 import org.apache.kafka.common.KafkaFuture;
39 import org.json.JSONObject;
40 import org.json.JSONArray;
41 import org.springframework.beans.factory.annotation.Qualifier;
42 import org.springframework.stereotype.Component;
44 import com.att.dmf.mr.CambriaApiException;
45 import com.att.dmf.mr.constants.CambriaConstants;
46 import com.att.dmf.mr.metabroker.Broker;
47 import com.att.dmf.mr.metabroker.Broker1;
48 import com.att.dmf.mr.metabroker.Topic;
49 import com.att.dmf.mr.utils.ConfigurationReader;
50 //import org.apache.log4-j.Logger;
51 import com.att.eelf.configuration.EELFLogger;
52 import com.att.eelf.configuration.EELFManager;
53 //import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
54 import com.att.nsa.configs.ConfigDb;
55 import com.att.nsa.configs.ConfigDbException;
56 import com.att.nsa.configs.ConfigPath;
57 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
58 import com.att.nsa.drumlin.till.nv.rrNvReadable;
59 import com.att.nsa.security.NsaAcl;
60 import com.att.nsa.security.NsaAclUtils;
61 import com.att.nsa.security.NsaApiKey;
65 * class performing all topic operations
67 * @author anowarul.islam
71 public class DMaaPKafkaMetaBroker implements Broker1 {
73 public DMaaPKafkaMetaBroker() {
75 fCambriaConfig = null;
76 fBaseTopicData = null;
77 final Properties props = new Properties ();
78 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
79 "kafka.metadata.broker.list");
80 if (null == fkafkaBrokers) {
82 fkafkaBrokers = "localhost:9092";
87 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
88 /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
89 props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
90 props.put("sasl.mechanism", "PLAIN");*/
91 fKafkaAdminClient=AdminClient.create ( props );
92 // fKafkaAdminClient = null;
95 //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
96 private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
97 private final AdminClient fKafkaAdminClient;
102 * DMaaPKafkaMetaBroker constructor initializing
108 public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
109 @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
110 //fSettings = settings;
112 fCambriaConfig = configDb;
113 fBaseTopicData = configDb.parse("/topics");
114 final Properties props = new Properties ();
115 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
116 "kafka.metadata.broker.list");
117 if (null == fkafkaBrokers) {
119 fkafkaBrokers = "localhost:9092";
124 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
125 /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
126 props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
127 props.put("sasl.mechanism", "PLAIN");*/
128 fKafkaAdminClient=AdminClient.create ( props );
129 // fKafkaAdminClient = null;
134 public DMaaPKafkaMetaBroker( rrNvReadable settings,
135 ZkClient zk, ConfigDb configDb,AdminClient client) {
136 //fSettings = settings;
138 fCambriaConfig = configDb;
139 fBaseTopicData = configDb.parse("/topics");
140 fKafkaAdminClient= client;
141 // fKafkaAdminClient = null;
147 public List<Topic> getAllTopics() throws ConfigDbException {
148 log.info("Retrieving list of all the topics.");
149 final LinkedList<Topic> result = new LinkedList<Topic>();
151 log.info("Retrieving all topics from root: " + zkTopicsRoot);
152 final List<String> topics = fZk.getChildren(zkTopicsRoot);
153 for (String topic : topics) {
154 result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
156 JSONObject dataObj = new JSONObject();
157 dataObj.put("topics", new JSONObject());
159 for (String topic : topics) {
160 dataObj.getJSONObject("topics").put(topic, new JSONObject());
162 } catch (ZkNoNodeException excp) {
163 // very fresh kafka doesn't have any topics or a topics node
164 log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
170 public Topic getTopic(String topic) throws ConfigDbException {
171 if (fZk.exists(zkTopicsRoot + "/" + topic)) {
172 return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
174 // else: no such topic in kafka
179 * static method get KafkaTopic object
185 * @throws ConfigDbException
187 public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
188 return new KafkaTopic(topic, db, base);
195 public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
196 boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
197 log.info("Creating topic: " + topic);
199 log.info("Check if topic [" + topic + "] exist.");
200 // first check for existence "our way"
201 final Topic t = getTopic(topic);
203 log.info("Could not create topic [" + topic + "]. Topic Already exists.");
204 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
206 } catch (ConfigDbException e1) {
207 log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
208 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
209 "Couldn't check topic data in config db.");
212 // we only allow 3 replicas. (If we don't test this, we get weird
213 // results from the cluster,
214 // so explicit test and fail.)
215 if (replicas < 1 || replicas > 3) {
216 log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
217 throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
218 "The replica count must be between 1 and 3.");
220 if (partitions < 1) {
221 log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
222 throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
229 final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
230 final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
231 final KafkaFuture<Void> ctrResult = ctr.all ();
233 // underlying Kafka topic created. now setup our API info
234 return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
236 catch ( InterruptedException e )
238 //timer.fail ( "Timeout" );
239 log.warn ( "Execution of describeTopics timed out." );
240 throw new ConfigDbException ( e );
242 catch ( ExecutionException e )
244 //timer.fail ( "ExecutionError" );
245 log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
246 throw new ConfigDbException ( e.getCause () );
252 public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
253 log.info("Deleting topic: " + topic);
254 ZkClient zkClient = null;
256 log.info("Loading zookeeper client for topic deletion.");
257 // topic creation. (Otherwise, the topic is only partially created
259 /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
260 ZKStringSerializer$.MODULE$);
261 String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
262 if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
263 ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
266 fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
267 log.info("Zookeeper client loaded successfully. Deleting topic.");
268 //AdminUtils.deleteTopic(zkutils, topic);
269 } catch (Exception e) {
270 log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
271 throw new ConfigDbException(e);
273 log.info("Closing zookeeper connection.");
274 if (zkClient != null)
278 // throw new UnsupportedOperationException ( "We can't programmatically
279 // delete Kafka topics yet." );
282 //private final rrNvReadable fSettings;
283 private final ZkClient fZk;
284 private final ConfigDb fCambriaConfig;
285 private final ConfigPath fBaseTopicData;
287 private static final String zkTopicsRoot = "/brokers/topics";
288 private static final JSONObject kEmptyAcl = new JSONObject();
291 * method Providing KafkaTopic Object associated with owner and
292 * transactionenabled or not
297 * @param transactionEnabled
299 * @throws ConfigDbException
301 public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
302 throws ConfigDbException {
303 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
307 * static method giving kafka topic object
314 * @param transactionEnabled
316 * @throws ConfigDbException
318 public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
319 boolean transactionEnabled) throws ConfigDbException {
320 final JSONObject o = new JSONObject();
321 o.put("owner", owner);
322 o.put("description", desc);
323 o.put("txenabled", transactionEnabled);
324 db.store(basePath.getChild(name), o.toString());
325 return new KafkaTopic(name, db, basePath);
329 * class performing all user opearation like user is eligible to read,
330 * write. permitting a user to write and read,
332 * @author anowarul.islam
335 public static class KafkaTopic implements Topic {
337 * constructor initializes
342 * @throws ConfigDbException
344 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
346 fConfigDb = configdb;
347 fBaseTopicData = baseTopic;
349 String data = fConfigDb.load(fBaseTopicData.getChild(fName));
354 final JSONObject o = new JSONObject(data);
355 fOwner = o.optString("owner", "");
356 fDesc = o.optString("description", "");
357 fTransactionEnabled = o.optBoolean("txenabled", false);// default
360 // if this topic has an owner, it needs both read/write ACLs. If there's no
361 // owner (or it's empty), null is okay -- this is for existing or implicitly
363 JSONObject readers = o.optJSONObject ( "readers" );
364 if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
365 fReaders = fromJson ( readers );
367 JSONObject writers = o.optJSONObject ( "writers" );
368 if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
369 fWriters = fromJson ( writers );
372 private NsaAcl fromJson(JSONObject o) {
373 NsaAcl acl = new NsaAcl();
375 JSONArray a = o.optJSONArray("allowed");
377 for (int i = 0; i < a.length(); ++i) {
378 String user = a.getString(i);
387 public String getName() {
392 public String getOwner() {
397 public String getDescription() {
402 public NsaAcl getReaderAcl() {
407 public NsaAcl getWriterAcl() {
412 public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
413 NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
417 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
418 NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
422 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
423 throws ConfigDbException, AccessDeniedException {
424 updateAcl(asUser, false, true, pubId);
428 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
429 updateAcl(asUser, false, false, pubId);
433 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
434 throws ConfigDbException, AccessDeniedException {
435 updateAcl(asUser, true, true, consumerId);
439 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
440 throws ConfigDbException, AccessDeniedException {
441 updateAcl(asUser, true, false, consumerId);
444 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
445 throws ConfigDbException, AccessDeniedException{
448 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
450 // we have to assume we have current data, or load it again. for the expected use
451 // case, assuming we can overwrite the data is fine.
452 final JSONObject o = new JSONObject ();
453 o.put ( "owner", fOwner );
454 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
455 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
456 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
458 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
461 catch ( ConfigDbException x )
465 catch ( AccessDeniedException x )
472 private JSONObject safeSerialize(NsaAcl acl) {
473 return acl == null ? null : acl.serialize();
476 private final String fName;
477 private final ConfigDb fConfigDb;
478 private final ConfigPath fBaseTopicData;
479 private final String fOwner;
480 private final String fDesc;
481 private final NsaAcl fReaders;
482 private final NsaAcl fWriters;
483 private boolean fTransactionEnabled;
485 public boolean isTransactionEnabled() {
486 return fTransactionEnabled;
490 public Set<String> getOwners() {
491 final TreeSet<String> owners = new TreeSet<String> ();
492 owners.add ( fOwner );