1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.beans;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Properties;
28 import java.util.TreeSet;
30 import org.I0Itec.zkclient.ZkClient;
31 import org.I0Itec.zkclient.exception.ZkNoNodeException;
32 //import org.apache.log4-j.Logger;
33 import com.att.eelf.configuration.EELFLogger;
34 import com.att.eelf.configuration.EELFManager;
36 import org.json.JSONArray;
37 import org.json.JSONObject;
38 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.CambriaApiException;
39 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metabroker.Broker;
40 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metabroker.Topic;
41 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.utils.ConfigurationReader;
42 import org.springframework.beans.factory.annotation.Qualifier;
44 import com.att.nsa.configs.ConfigDb;
45 import com.att.nsa.configs.ConfigDbException;
46 import com.att.nsa.configs.ConfigPath;
47 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
48 import com.att.nsa.drumlin.till.nv.rrNvReadable;
49 import com.att.nsa.security.NsaAcl;
50 import com.att.nsa.security.NsaAclUtils;
51 import com.att.nsa.security.NsaApiKey;
53 import kafka.admin.AdminUtils;
54 import kafka.utils.ZKStringSerializer$;
57 * class performing all topic operations
63 public class DMaaPKafkaMetaBroker implements Broker {
65 //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
66 private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
70 * DMaaPKafkaMetaBroker constructor initializing
76 public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
77 @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
78 //fSettings = settings;
80 fCambriaConfig = configDb;
81 fBaseTopicData = configDb.parse("/topics");
85 public List<Topic> getAllTopics() throws ConfigDbException {
86 log.info("Retrieving list of all the topics.");
87 final LinkedList<Topic> result = new LinkedList<Topic>();
89 log.info("Retrieving all topics from root: " + zkTopicsRoot);
90 final List<String> topics = fZk.getChildren(zkTopicsRoot);
91 for (String topic : topics) {
92 result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
95 JSONObject dataObj = new JSONObject();
96 dataObj.put("topics", new JSONObject());
98 for (String topic : topics) {
99 dataObj.getJSONObject("topics").put(topic, new JSONObject());
101 } catch (ZkNoNodeException excp) {
102 // very fresh kafka doesn't have any topics or a topics node
103 log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
109 public Topic getTopic(String topic) throws ConfigDbException {
110 if (fZk.exists(zkTopicsRoot + "/" + topic)) {
111 return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
113 // else: no such topic in kafka
118 * static method get KafkaTopic object
124 * @throws ConfigDbException
126 public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
127 return new KafkaTopic(topic, db, base);
134 public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
135 boolean transactionEnabled) throws TopicExistsException, CambriaApiException {
136 log.info("Creating topic: " + topic);
138 log.info("Check if topic [" + topic + "] exist.");
139 // first check for existence "our way"
140 final Topic t = getTopic(topic);
142 log.info("Could not create topic [" + topic + "]. Topic Already exists.");
143 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
145 } catch (ConfigDbException e1) {
146 log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
147 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
148 "Couldn't check topic data in config db.");
151 // we only allow 3 replicas. (If we don't test this, we get weird
152 // results from the cluster,
153 // so explicit test and fail.)
154 if (replicas < 1 || replicas > 3) {
155 log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
156 throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
157 "The replica count must be between 1 and 3.");
159 if (partitions < 1) {
160 log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
161 throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
166 ZkClient zkClient = null;
168 log.info("Loading zookeeper client for creating topic.");
169 // FIXME: use of this scala module$ thing is a goofy hack to
170 // make Kafka aware of the
171 // topic creation. (Otherwise, the topic is only partially
173 zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
174 ZKStringSerializer$.MODULE$);
176 log.info("Zookeeper client loaded successfully. Creating topic.");
177 AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties());
178 } catch (kafka.common.TopicExistsException e) {
179 log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e);
180 throw new TopicExistsException(topic);
181 } catch (ZkNoNodeException e) {
182 log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e);
183 // Kafka throws this when the server isn't running (and perhaps
185 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
186 "The Kafka cluster is not setup.");
187 } catch (kafka.admin.AdminOperationException e) {
188 // Kafka throws this when the server isn't running (and perhaps
190 log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(),
192 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
193 "The Kafka cluster can't handle your request. Talk to the administrators.");
195 log.info("Closing zookeeper connection.");
196 if (zkClient != null)
200 log.info("Creating topic entry for topic: " + topic);
201 // underlying Kafka topic created. now setup our API info
202 return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
203 } catch (ConfigDbException excp) {
204 log.error("Failed to create topic data. Talk to the administrators: " + excp.getMessage(), excp);
205 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
206 "Failed to create topic data. Talk to the administrators.");
211 public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException {
212 log.info("Deleting topic: " + topic);
213 ZkClient zkClient = null;
215 log.info("Loading zookeeper client for topic deletion.");
216 // FIXME: use of this scala module$ thing is a goofy hack to make
217 // Kafka aware of the
218 // topic creation. (Otherwise, the topic is only partially created
220 zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
221 ZKStringSerializer$.MODULE$);
223 log.info("Zookeeper client loaded successfully. Deleting topic.");
224 AdminUtils.deleteTopic(zkClient, topic);
225 } catch (kafka.common.TopicExistsException e) {
226 log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
227 throw new TopicExistsException(topic);
228 } catch (ZkNoNodeException e) {
229 log.error("Failed to delete topic [" + topic + "]. The Kafka cluster is not setup." + e.getMessage(), e);
230 // Kafka throws this when the server isn't running (and perhaps
232 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, "The Kafka cluster is not setup.");
233 } catch (kafka.admin.AdminOperationException e) {
234 // Kafka throws this when the server isn't running (and perhaps
236 log.error("The Kafka cluster can't handle your request. Talk to the administrators." + e.getMessage(), e);
237 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
238 "The Kafka cluster can't handle your request. Talk to the administrators.");
240 log.info("Closing zookeeper connection.");
241 if (zkClient != null)
245 // throw new UnsupportedOperationException ( "We can't programmatically
246 // delete Kafka topics yet." );
249 //private final rrNvReadable fSettings;
250 private final ZkClient fZk;
251 private final ConfigDb fCambriaConfig;
252 private final ConfigPath fBaseTopicData;
254 private static final String zkTopicsRoot = "/brokers/topics";
255 private static final JSONObject kEmptyAcl = new JSONObject();
258 * method Providing KafkaTopic Object associated with owner and
259 * transactionenabled or not
264 * @param transactionEnabled
266 * @throws ConfigDbException
268 public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
269 throws ConfigDbException {
270 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
274 * static method giving kafka topic object
281 * @param transactionEnabled
283 * @throws ConfigDbException
285 public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
286 boolean transactionEnabled) throws ConfigDbException {
287 final JSONObject o = new JSONObject();
288 o.put("owner", owner);
289 o.put("description", desc);
290 o.put("txenabled", transactionEnabled);
291 db.store(basePath.getChild(name), o.toString());
292 return new KafkaTopic(name, db, basePath);
296 * class performing all user opearation like user is eligible to read,
297 * write. permitting a user to write and read,
302 public static class KafkaTopic implements Topic {
304 * constructor initializes
309 * @throws ConfigDbException
311 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
313 fConfigDb = configdb;
314 fBaseTopicData = baseTopic;
316 String data = fConfigDb.load(fBaseTopicData.getChild(fName));
321 final JSONObject o = new JSONObject(data);
322 fOwner = o.optString("owner", "");
323 fDesc = o.optString("description", "");
324 fTransactionEnabled = o.optBoolean("txenabled", false);// default
327 // if this topic has an owner, it needs both read/write ACLs. If there's no
328 // owner (or it's empty), null is okay -- this is for existing or implicitly
330 JSONObject readers = o.optJSONObject ( "readers" );
331 if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
332 fReaders = fromJson ( readers );
334 JSONObject writers = o.optJSONObject ( "writers" );
335 if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
336 fWriters = fromJson ( writers );
338 private NsaAcl fromJson(JSONObject o) {
339 NsaAcl acl = new NsaAcl();
341 JSONArray a = o.optJSONArray("allowed");
343 for (int i = 0; i < a.length(); ++i) {
344 String user = a.getString(i);
352 public String getName() {
357 public String getOwner() {
362 public String getDescription() {
367 public NsaAcl getReaderAcl() {
372 public NsaAcl getWriterAcl() {
377 public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
378 NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
382 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
383 NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
387 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
388 throws ConfigDbException, AccessDeniedException {
389 updateAcl(asUser, false, true, pubId);
393 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
394 updateAcl(asUser, false, false, pubId);
398 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
399 throws ConfigDbException, AccessDeniedException {
400 updateAcl(asUser, true, true, consumerId);
404 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
405 throws ConfigDbException, AccessDeniedException {
406 updateAcl(asUser, true, false, consumerId);
409 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
410 throws ConfigDbException, AccessDeniedException{
413 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
415 // we have to assume we have current data, or load it again. for the expected use
416 // case, assuming we can overwrite the data is fine.
417 final JSONObject o = new JSONObject ();
418 o.put ( "owner", fOwner );
419 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
420 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
421 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
423 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
426 catch ( ConfigDbException x )
430 catch ( AccessDeniedException x )
437 private JSONObject safeSerialize(NsaAcl acl) {
438 return acl == null ? null : acl.serialize();
441 private final String fName;
442 private final ConfigDb fConfigDb;
443 private final ConfigPath fBaseTopicData;
444 private final String fOwner;
445 private final String fDesc;
446 private final NsaAcl fReaders;
447 private final NsaAcl fWriters;
448 private boolean fTransactionEnabled;
450 public boolean isTransactionEnabled() {
451 return fTransactionEnabled;
455 public Set<String> getOwners() {
456 final TreeSet<String> owners = new TreeSet<String> ();
457 owners.add ( fOwner );