1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
24 import com.att.eelf.configuration.EELFLogger;
25 import com.att.eelf.configuration.EELFManager;
26 import com.att.nsa.configs.ConfigDb;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.configs.ConfigPath;
29 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
30 import com.att.nsa.drumlin.till.nv.rrNvReadable;
31 import com.att.nsa.security.NsaAcl;
32 import com.att.nsa.security.NsaAclUtils;
33 import com.att.nsa.security.NsaApiKey;
34 import org.I0Itec.zkclient.ZkClient;
35 import org.I0Itec.zkclient.exception.ZkNoNodeException;
36 import org.apache.kafka.clients.admin.AdminClient;
37 import org.apache.kafka.clients.admin.AdminClientConfig;
38 import org.apache.kafka.clients.admin.CreateTopicsResult;
39 import org.apache.kafka.clients.admin.ListTopicsResult;
40 import org.apache.kafka.clients.admin.NewTopic;
41 import org.apache.kafka.common.KafkaFuture;
42 import org.json.JSONArray;
43 import org.json.JSONObject;
44 import org.onap.dmaap.dmf.mr.CambriaApiException;
45 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
46 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
47 import org.onap.dmaap.dmf.mr.metabroker.Topic;
48 import org.onap.dmaap.dmf.mr.utils.Utils;
49 import org.springframework.beans.factory.annotation.Qualifier;
50 import org.springframework.util.StringUtils;
53 import java.util.concurrent.ExecutionException;
57 * class performing all topic operations
59 * @author anowarul.islam
63 public class DMaaPKafkaMetaBroker implements Broker1 {
65 private static final EELFLogger log = EELFManager.getLogger(DMaaPKafkaMetaBroker.class);
66 private final AdminClient fKafkaAdminClient;
67 private static final boolean GET_TOPICS_FROM_ZK = Boolean.parseBoolean(System.getenv().
68 getOrDefault("useZkTopicStore", "true"));
69 private final ZkClient fZk;
70 private final ConfigDb fCambriaConfig;
71 private final ConfigPath fBaseTopicData;
72 private static final String ZK_TOPICS_ROOT = "/brokers/topics";
73 private static final JSONObject kEmptyAcl = new JSONObject();
75 public DMaaPKafkaMetaBroker() {
77 fCambriaConfig = null;
78 fBaseTopicData = null;
79 final Properties props = new Properties ();
80 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
81 "kafka.metadata.broker.list");
83 if (StringUtils.isEmpty(fkafkaBrokers)) {
84 fkafkaBrokers = "localhost:9092";
86 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
88 if(Utils.isCadiEnabled()){
89 props.putAll(Utils.addSaslProps());
91 fKafkaAdminClient=AdminClient.create ( props );
95 * DMaaPKafkaMetaBroker constructor initializing
101 public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
102 @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
104 fCambriaConfig = configDb;
105 fBaseTopicData = configDb.parse("/topics");
106 final Properties props = new Properties ();
107 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
108 "kafka.metadata.broker.list");
110 if (null == fkafkaBrokers) {
111 fkafkaBrokers = "localhost:9092";
113 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
115 if(Utils.isCadiEnabled()){
116 props.putAll(Utils.addSaslProps());
118 fKafkaAdminClient=AdminClient.create ( props );
121 public DMaaPKafkaMetaBroker(ZkClient zk, ConfigDb configDb,AdminClient client) {
123 fCambriaConfig = configDb;
124 fBaseTopicData = configDb.parse("/topics");
125 fKafkaAdminClient= client;
129 public List<Topic> getAllTopics() throws ConfigDbException {
130 log.info("Retrieving list of all the topics.");
131 if (!GET_TOPICS_FROM_ZK) {
132 return getTopicsFromKafka();
134 return getTopicsFromZookeeper();
137 private LinkedList<Topic> getTopicsFromKafka() throws ConfigDbException {
138 LinkedList<Topic> res = new LinkedList<>();
139 final ListTopicsResult ltr = fKafkaAdminClient.listTopics();
141 for (String name: ltr.names().get()) {
142 res.add(new KafkaTopic(name, fCambriaConfig, fBaseTopicData));
144 } catch (InterruptedException | ExecutionException e) {
145 log.error("GetAllTopicsFromKafka: Failed to retrieve topic list from kafka.", e);
150 private LinkedList<Topic> getTopicsFromZookeeper() throws ConfigDbException {
151 final LinkedList<Topic> legacyResult = new LinkedList<>();
153 log.info("Retrieving all topics from root: " + ZK_TOPICS_ROOT);
154 final List<String> topics = fZk.getChildren(ZK_TOPICS_ROOT);
155 for (String topic : topics) {
156 legacyResult.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
158 JSONObject dataObj = new JSONObject();
159 dataObj.put("topics", new JSONObject());
161 for (String topic : topics) {
162 dataObj.getJSONObject("topics").put(topic, new JSONObject());
164 } catch (ZkNoNodeException excp) {
165 // very fresh kafka doesn't have any topics or a topics node
166 log.error("ZK doesn't have a Kakfa topics node at " + ZK_TOPICS_ROOT, excp);
172 public Topic getTopic(String topic) throws ConfigDbException {
173 if (!GET_TOPICS_FROM_ZK) {
175 for (String name : fKafkaAdminClient.listTopics().names().get()) {
176 if (name.equals(topic)) {
177 log.debug("TOPIC_NAME: {} is equal to : {}", name, topic);
178 return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
181 } catch (InterruptedException | ExecutionException e) {
182 log.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e);
185 } else if (fZk.exists(ZK_TOPICS_ROOT + "/" + topic)) {
186 return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
188 // else: no such topic
193 * static method get KafkaTopic object
199 * @throws ConfigDbException
201 public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
202 return new KafkaTopic(topic, db, base);
209 public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
210 boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
211 log.info("Creating topic: {}", topic);
213 log.info("Check if topic [{}] exist.", topic);
214 // first check for existence "our way"
215 final Topic t = getTopic(topic);
217 log.info("Could not create topic [{}]. Topic Already exists.", topic);
218 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
220 } catch (ConfigDbException e1) {
221 log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
222 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
223 "Couldn't check topic data in config db.");
226 // we only allow 3 replicas. (If we don't test this, we get weird
227 // results from the cluster,
228 // so explicit test and fail.)
229 if (replicas < 1 || replicas > 3) {
230 log.info("Topic [{}] could not be created. The replica count must be between 1 and 3.", topic);
231 throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
232 "The replica count must be between 1 and 3.");
234 if (partitions < 1) {
235 log.info("Topic [{}] could not be created. The partition count must be at least 1.", topic);
236 throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
240 final NewTopic topicRequest = new NewTopic(topic, partitions, (short)replicas);
241 final CreateTopicsResult ctr = fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
242 final KafkaFuture<Void> ctrResult = ctr.all();
244 // underlying Kafka topic created. now setup our API info
245 return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
246 } catch (InterruptedException e) {
247 log.warn("Execution of describeTopics timed out.");
248 throw new ConfigDbException(e);
249 } catch (ExecutionException e) {
250 log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
251 throw new ConfigDbException(e.getCause());
257 public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
258 log.info("Deleting topic: {}", topic);
260 log.info("Loading zookeeper client for topic deletion.");
261 // topic creation. (Otherwise, the topic is only partially created
263 fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
264 log.info("Zookeeper client loaded successfully. Deleting topic.");
265 } catch (Exception e) {
266 log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
267 throw new ConfigDbException(e);
269 log.info("Closing zookeeper connection.");
274 * method Providing KafkaTopic Object associated with owner and
275 * transactionenabled or not
280 * @param transactionEnabled
282 * @throws ConfigDbException
284 public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
285 throws ConfigDbException {
286 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
290 * static method giving kafka topic object
297 * @param transactionEnabled
299 * @throws ConfigDbException
301 public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
302 boolean transactionEnabled) throws ConfigDbException {
303 final JSONObject o = new JSONObject();
304 o.put("owner", owner);
305 o.put("description", desc);
306 o.put("txenabled", transactionEnabled);
307 if (GET_TOPICS_FROM_ZK) {
308 db.store(basePath.getChild(name), o.toString());
310 return new KafkaTopic(name, db, basePath);
314 * class performing all user operation like user is eligible to read,
315 * write. permitting a user to write and read etc
317 * @author anowarul.islam
320 public static class KafkaTopic implements Topic {
322 * constructor initializes
327 * @throws ConfigDbException
330 private final String fName;
331 private final ConfigDb fConfigDb;
332 private final ConfigPath fBaseTopicData;
333 private final String fOwner;
334 private final String fDesc;
335 private final NsaAcl fReaders;
336 private final NsaAcl fWriters;
337 private final boolean fTransactionEnabled;
339 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
341 fConfigDb = configdb;
342 fBaseTopicData = baseTopic;
344 String data = fConfigDb.load(fBaseTopicData.getChild(fName));
349 final JSONObject o = new JSONObject(data);
350 fOwner = o.optString("owner", "");
351 fDesc = o.optString("description", "");
352 fTransactionEnabled = o.optBoolean("txenabled", false);// default
355 // if this topic has an owner, it needs both read/write ACLs. If there's no
356 // owner (or it's empty), null is okay -- this is for existing or implicitly
358 JSONObject readers = o.optJSONObject ( "readers" );
359 if ( readers == null && fOwner.length () > 0 )
363 fReaders = fromJson ( readers );
365 JSONObject writers = o.optJSONObject ( "writers" );
366 if ( writers == null && fOwner.length () > 0 )
370 fWriters = fromJson ( writers );
373 private NsaAcl fromJson(JSONObject o) {
374 NsaAcl acl = new NsaAcl();
376 JSONArray a = o.optJSONArray("allowed");
378 for (int i = 0; i < a.length(); ++i) {
379 String user = a.getString(i);
388 public String getName() {
393 public String getOwner() {
398 public String getDescription() {
403 public NsaAcl getReaderAcl() {
408 public NsaAcl getWriterAcl() {
413 public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
414 NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
418 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
419 NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
423 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
424 throws ConfigDbException, AccessDeniedException {
425 updateAcl(asUser, false, true, pubId);
429 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
430 updateAcl(asUser, false, false, pubId);
434 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
435 throws ConfigDbException, AccessDeniedException {
436 updateAcl(asUser, true, true, consumerId);
440 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
441 throws ConfigDbException, AccessDeniedException {
442 updateAcl(asUser, true, false, consumerId);
445 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
446 throws ConfigDbException, AccessDeniedException{
448 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
449 // we have to assume we have current data, or load it again. for the expected use
450 // case, assuming we can overwrite the data is fine.
451 final JSONObject o = new JSONObject ();
452 o.put ( "owner", fOwner );
453 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
454 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
455 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
457 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
458 } catch ( ConfigDbException | AccessDeniedException x ) {
459 log.info("Error when trying to update acl for key {}", key);
465 private JSONObject safeSerialize(NsaAcl acl) {
466 return acl == null ? null : acl.serialize();
469 public boolean isTransactionEnabled() {
470 return fTransactionEnabled;
474 public Set<String> getOwners() {
475 final TreeSet<String> owners = new TreeSet<>();
476 owners.add ( fOwner );