e7d777e4304bfb4bc00cf84b1bf8d7f334427439
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / beans / DMaaPKafkaMetaBroker.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.beans;
23
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.Set;
28 import java.util.TreeSet;
29
30 import org.I0Itec.zkclient.ZkClient;
31 import org.I0Itec.zkclient.exception.ZkNoNodeException;
32 //import org.apache.log4-j.Logger;
33 import com.att.eelf.configuration.EELFLogger;
34 import com.att.eelf.configuration.EELFManager;
35
36 import org.json.JSONArray;
37 import org.json.JSONObject;
38 import org.springframework.beans.factory.annotation.Qualifier;
39
40 import com.att.nsa.cambria.CambriaApiException;
41 import com.att.nsa.cambria.metabroker.Broker;
42 import com.att.nsa.cambria.metabroker.Topic;
43 import com.att.nsa.cambria.utils.ConfigurationReader;
44 import com.att.nsa.configs.ConfigDb;
45 import com.att.nsa.configs.ConfigDbException;
46 import com.att.nsa.configs.ConfigPath;
47 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
48 import com.att.nsa.drumlin.till.nv.rrNvReadable;
49 import com.att.nsa.security.NsaAcl;
50 import com.att.nsa.security.NsaAclUtils;
51 import com.att.nsa.security.NsaApiKey;
52
53 import kafka.admin.AdminUtils;
54 import kafka.utils.ZKStringSerializer$;
55
56 /**
57  * class performing all topic operations
58  * 
59  * @author author
60  *
61  */
62
63 public class DMaaPKafkaMetaBroker implements Broker {
64
65         //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
66         private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
67         
68
69         /**
70          * DMaaPKafkaMetaBroker constructor initializing
71          * 
72          * @param settings
73          * @param zk
74          * @param configDb
75          */
76         public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
77                         @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
78                 //fSettings = settings;
79                 fZk = zk;
80                 fCambriaConfig = configDb;
81                 fBaseTopicData = configDb.parse("/topics");
82         }
83
84         @Override
85         public List<Topic> getAllTopics() throws ConfigDbException {
86                 log.info("Retrieving list of all the topics.");
87                 final LinkedList<Topic> result = new LinkedList<Topic>();
88                 try {
89                         log.info("Retrieving all topics from root: " + zkTopicsRoot);
90                         final List<String> topics = fZk.getChildren(zkTopicsRoot);
91                         for (String topic : topics) {
92                                 result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
93                         }
94
95                         JSONObject dataObj = new JSONObject();
96                         dataObj.put("topics", new JSONObject());
97
98                         for (String topic : topics) {
99                                 dataObj.getJSONObject("topics").put(topic, new JSONObject());
100                         }
101                 } catch (ZkNoNodeException excp) {
102                         // very fresh kafka doesn't have any topics or a topics node
103                         log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
104                 }
105                 return result;
106         }
107
108         @Override
109         public Topic getTopic(String topic) throws ConfigDbException {
110                 if (fZk.exists(zkTopicsRoot + "/" + topic)) {
111                         return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
112                 }
113                 // else: no such topic in kafka
114                 return null;
115         }
116
117         /**
118          * static method get KafkaTopic object
119          * 
120          * @param db
121          * @param base
122          * @param topic
123          * @return
124          * @throws ConfigDbException
125          */
126         public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
127                 return new KafkaTopic(topic, db, base);
128         }
129
130         /**
131          * creating topic
132          */
133         @Override
134         public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
135                         boolean transactionEnabled) throws TopicExistsException, CambriaApiException {
136                 log.info("Creating topic: " + topic);
137                 try {
138                         log.info("Check if topic [" + topic + "] exist.");
139                         // first check for existence "our way"
140                         final Topic t = getTopic(topic);
141                         if (t != null) {
142                                 log.info("Could not create topic [" + topic + "]. Topic Already exists.");
143                                 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
144                         }
145                 } catch (ConfigDbException e1) {
146                         log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
147                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
148                                         "Couldn't check topic data in config db.");
149                 }
150
151                 // we only allow 3 replicas. (If we don't test this, we get weird
152                 // results from the cluster,
153                 // so explicit test and fail.)
154                 if (replicas < 1 || replicas > 3) {
155                         log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
156                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
157                                         "The replica count must be between 1 and 3.");
158                 }
159                 if (partitions < 1) {
160                         log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
161                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
162                 }
163
164                 // create via kafka
165                 try {
166                         ZkClient zkClient = null;
167                         try {
168                                 log.info("Loading zookeeper client for creating topic.");
169                                 // FIXME: use of this scala module$ thing is a goofy hack to
170                                 // make Kafka aware of the
171                                 // topic creation. (Otherwise, the topic is only partially
172                                 // created in ZK.)
173                                 zkClient = ZkClientFactory.createZkClient();
174                                 log.info("Zookeeper client loaded successfully. Creating topic.");
175                                 AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties());
176                         } catch (kafka.common.TopicExistsException e) {
177                                 log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e);
178                                 throw new TopicExistsException(topic);
179                         } catch (ZkNoNodeException e) {
180                                 log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e);
181                                 // Kafka throws this when the server isn't running (and perhaps
182                                 // hasn't ever run)
183                                 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
184                                                 "The Kafka cluster is not setup.");
185                         } catch (kafka.admin.AdminOperationException e) {
186                                 // Kafka throws this when the server isn't running (and perhaps
187                                 // hasn't ever run)
188                                 log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(),
189                                                 e);
190                                 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
191                                                 "The Kafka cluster can't handle your request. Talk to the administrators.");
192                         } finally {
193                                 log.info("Closing zookeeper connection.");
194                                 if (zkClient != null)
195                                         zkClient.close();
196                         }
197
198                         log.info("Creating topic entry for topic: " + topic);
199                         // underlying Kafka topic created. now setup our API info
200                         return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
201                 } catch (ConfigDbException excp) {
202                         log.error("Failed to create topic data. Talk to the administrators: " + excp.getMessage(), excp);
203                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
204                                         "Failed to create topic data. Talk to the administrators.");
205                 }
206         }
207
208         @Override
209         public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException {
210                 log.info("Deleting topic: " + topic);
211                 ZkClient zkClient = null;
212                 try {
213                         log.info("Loading zookeeper client for topic deletion.");
214                         // FIXME: use of this scala module$ thing is a goofy hack to make
215                         // Kafka aware of the
216                         // topic creation. (Otherwise, the topic is only partially created
217                         // in ZK.)
218                         zkClient = ZkClientFactory.createZkClient();
219
220                         log.info("Zookeeper client loaded successfully. Deleting topic.");
221                         AdminUtils.deleteTopic(zkClient, topic);
222                 } catch (kafka.common.TopicExistsException e) {
223                         log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
224                         throw new TopicExistsException(topic);
225                 } catch (ZkNoNodeException e) {
226                         log.error("Failed to delete topic [" + topic + "]. The Kafka cluster is not setup." + e.getMessage(), e);
227                         // Kafka throws this when the server isn't running (and perhaps
228                         // hasn't ever run)
229                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, "The Kafka cluster is not setup.");
230                 } catch (kafka.admin.AdminOperationException e) {
231                         // Kafka throws this when the server isn't running (and perhaps
232                         // hasn't ever run)
233                         log.error("The Kafka cluster can't handle your request. Talk to the administrators." + e.getMessage(), e);
234                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
235                                         "The Kafka cluster can't handle your request. Talk to the administrators.");
236                 } finally {
237                         log.info("Closing zookeeper connection.");
238                         if (zkClient != null)
239                                 zkClient.close();
240                 }
241
242                 // throw new UnsupportedOperationException ( "We can't programmatically
243                 // delete Kafka topics yet." );
244         }
245         
246         
247
248         //private final rrNvReadable fSettings;
249         private final ZkClient fZk;
250         private final ConfigDb fCambriaConfig;
251         private final ConfigPath fBaseTopicData;
252
253         private static final String zkTopicsRoot = "/brokers/topics";
254         private static final JSONObject kEmptyAcl = new JSONObject();
255
256         /**
257          * method Providing KafkaTopic Object associated with owner and
258          * transactionenabled or not
259          * 
260          * @param name
261          * @param desc
262          * @param owner
263          * @param transactionEnabled
264          * @return
265          * @throws ConfigDbException
266          */
267         public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
268                         throws ConfigDbException {
269                 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
270         }
271
272         /**
273          * static method giving kafka topic object
274          * 
275          * @param db
276          * @param basePath
277          * @param name
278          * @param desc
279          * @param owner
280          * @param transactionEnabled
281          * @return
282          * @throws ConfigDbException
283          */
284         public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
285                         boolean transactionEnabled) throws ConfigDbException {
286                 final JSONObject o = new JSONObject();
287                 o.put("owner", owner);
288                 o.put("description", desc);
289                 o.put("txenabled", transactionEnabled);
290                 db.store(basePath.getChild(name), o.toString());
291                 return new KafkaTopic(name, db, basePath);
292         }
293
294         /**
295          * class performing all user opearation like user is eligible to read,
296          * write. permitting a user to write and read,
297          * 
298          * @author author
299          *
300          */
301         public static class KafkaTopic implements Topic {
302                 /**
303                  * constructor initializes
304                  * 
305                  * @param name
306                  * @param configdb
307                  * @param baseTopic
308                  * @throws ConfigDbException
309                  */
310                 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
311                         fName = name;
312                         fConfigDb = configdb;
313                         fBaseTopicData = baseTopic;
314
315                         String data = fConfigDb.load(fBaseTopicData.getChild(fName));
316                         if (data == null) {
317                                 data = "{}";
318                         }
319
320                         final JSONObject o = new JSONObject(data);
321                         fOwner = o.optString("owner", "");
322                         fDesc = o.optString("description", "");
323                         fTransactionEnabled = o.optBoolean("txenabled", false);// default
324                                                                                                                                         // value is
325                                                                                                                                         // false
326                         // if this topic has an owner, it needs both read/write ACLs. If there's no
327                                                 // owner (or it's empty), null is okay -- this is for existing or implicitly
328                                                 // created topics.
329                                                 JSONObject readers = o.optJSONObject ( "readers" );
330                                                 if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
331                                                 fReaders = fromJson ( readers );
332
333                                                 JSONObject writers = o.optJSONObject ( "writers" );
334                                                 if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
335                                                 fWriters = fromJson ( writers );
336                 }
337                  private NsaAcl fromJson(JSONObject o) {
338                                 NsaAcl acl = new NsaAcl();
339                                 if (o != null) {
340                                         JSONArray a = o.optJSONArray("allowed");
341                                         if (a != null) {
342                                                 for (int i = 0; i < a.length(); ++i) {
343                                                         String user = a.getString(i);
344                                                         acl.add(user);
345                                                 }
346                                         }
347                                 }
348                                 return acl;
349                         }
350                 @Override
351                 public String getName() {
352                         return fName;
353                 }
354
355                 @Override
356                 public String getOwner() {
357                         return fOwner;
358                 }
359
360                 @Override
361                 public String getDescription() {
362                         return fDesc;
363                 }
364
365                 @Override
366                 public NsaAcl getReaderAcl() {
367                         return fReaders;
368                 }
369
370                 @Override
371                 public NsaAcl getWriterAcl() {
372                         return fWriters;
373                 }
374
375                 @Override
376                 public void checkUserRead(NsaApiKey user) throws AccessDeniedException  {
377                         NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
378                 }
379
380                 @Override
381                 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException  {
382                         NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
383                 }
384
385                 @Override
386                 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
387                                 throws ConfigDbException, AccessDeniedException {
388                         updateAcl(asUser, false, true, pubId);
389                 }
390
391                 @Override
392                 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
393                         updateAcl(asUser, false, false, pubId);
394                 }
395
396                 @Override
397                 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
398                                 throws ConfigDbException, AccessDeniedException {
399                         updateAcl(asUser, true, true, consumerId);
400                 }
401
402                 @Override
403                 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
404                                 throws ConfigDbException, AccessDeniedException {
405                         updateAcl(asUser, true, false, consumerId);
406                 }
407
408                 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
409                                 throws ConfigDbException, AccessDeniedException{
410                         try
411                         {
412                                 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
413         
414                                 // we have to assume we have current data, or load it again. for the expected use
415                                 // case, assuming we can overwrite the data is fine.
416                                 final JSONObject o = new JSONObject ();
417                                 o.put ( "owner", fOwner );
418                                 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
419                                 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
420                                 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
421                                 
422                                 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
423         
424                         }
425                         catch ( ConfigDbException x )
426                         {
427                                 throw x;
428                         }
429                         catch ( AccessDeniedException x )
430                         {
431                                 throw x;
432                         }
433                         
434                 }
435
436                 private JSONObject safeSerialize(NsaAcl acl) {
437                         return acl == null ? null : acl.serialize();
438                 }
439
440                 private final String fName;
441                 private final ConfigDb fConfigDb;
442                 private final ConfigPath fBaseTopicData;
443                 private final String fOwner;
444                 private final String fDesc;
445                 private final NsaAcl fReaders;
446                 private final NsaAcl fWriters;
447                 private boolean fTransactionEnabled;
448
449                 public boolean isTransactionEnabled() {
450                         return fTransactionEnabled;
451                 }
452
453                 @Override
454                 public Set<String> getOwners() {
455                         final TreeSet<String> owners = new TreeSet<String> ();
456                         owners.add ( fOwner );
457                         return owners;
458                 }
459         }
460
461 }