aad992c98d35be4f89f63b48fda300e9f6ed4e93
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / messagerouter / msgrtr / nsa / cambria / beans / DMaaPKafkaMetaBroker.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.beans;
23
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.Set;
28 import java.util.TreeSet;
29
30 import org.I0Itec.zkclient.ZkClient;
31 import org.I0Itec.zkclient.exception.ZkNoNodeException;
32 //import org.apache.log4-j.Logger;
33 import com.att.eelf.configuration.EELFLogger;
34 import com.att.eelf.configuration.EELFManager;
35
36 import org.json.JSONArray;
37 import org.json.JSONObject;
38 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.CambriaApiException;
39 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metabroker.Broker;
40 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metabroker.Topic;
41 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.utils.ConfigurationReader;
42 import org.springframework.beans.factory.annotation.Qualifier;
43
44 import com.att.nsa.configs.ConfigDb;
45 import com.att.nsa.configs.ConfigDbException;
46 import com.att.nsa.configs.ConfigPath;
47 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
48 import com.att.nsa.drumlin.till.nv.rrNvReadable;
49 import com.att.nsa.security.NsaAcl;
50 import com.att.nsa.security.NsaAclUtils;
51 import com.att.nsa.security.NsaApiKey;
52
53 import kafka.admin.AdminUtils;
54 import kafka.utils.ZKStringSerializer$;
55
56 /**
57  * class performing all topic operations
58  * 
59  * @author author
60  *
61  */
62
63 public class DMaaPKafkaMetaBroker implements Broker {
64
65         //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
66         private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
67         
68
69         /**
70          * DMaaPKafkaMetaBroker constructor initializing
71          * 
72          * @param settings
73          * @param zk
74          * @param configDb
75          */
76         public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
77                         @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
78                 //fSettings = settings;
79                 fZk = zk;
80                 fCambriaConfig = configDb;
81                 fBaseTopicData = configDb.parse("/topics");
82         }
83
84         @Override
85         public List<Topic> getAllTopics() throws ConfigDbException {
86                 log.info("Retrieving list of all the topics.");
87                 final LinkedList<Topic> result = new LinkedList<Topic>();
88                 try {
89                         log.info("Retrieving all topics from root: " + zkTopicsRoot);
90                         final List<String> topics = fZk.getChildren(zkTopicsRoot);
91                         for (String topic : topics) {
92                                 result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
93                         }
94
95                         JSONObject dataObj = new JSONObject();
96                         dataObj.put("topics", new JSONObject());
97
98                         for (String topic : topics) {
99                                 dataObj.getJSONObject("topics").put(topic, new JSONObject());
100                         }
101                 } catch (ZkNoNodeException excp) {
102                         // very fresh kafka doesn't have any topics or a topics node
103                         log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
104                 }
105                 return result;
106         }
107
108         @Override
109         public Topic getTopic(String topic) throws ConfigDbException {
110                 if (fZk.exists(zkTopicsRoot + "/" + topic)) {
111                         return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
112                 }
113                 // else: no such topic in kafka
114                 return null;
115         }
116
117         /**
118          * static method get KafkaTopic object
119          * 
120          * @param db
121          * @param base
122          * @param topic
123          * @return
124          * @throws ConfigDbException
125          */
126         public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
127                 return new KafkaTopic(topic, db, base);
128         }
129
130         /**
131          * creating topic
132          */
133         @Override
134         public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
135                         boolean transactionEnabled) throws TopicExistsException, CambriaApiException {
136                 log.info("Creating topic: " + topic);
137                 try {
138                         log.info("Check if topic [" + topic + "] exist.");
139                         // first check for existence "our way"
140                         final Topic t = getTopic(topic);
141                         if (t != null) {
142                                 log.info("Could not create topic [" + topic + "]. Topic Already exists.");
143                                 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
144                         }
145                 } catch (ConfigDbException e1) {
146                         log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
147                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
148                                         "Couldn't check topic data in config db.");
149                 }
150
151                 // we only allow 3 replicas. (If we don't test this, we get weird
152                 // results from the cluster,
153                 // so explicit test and fail.)
154                 if (replicas < 1 || replicas > 3) {
155                         log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
156                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
157                                         "The replica count must be between 1 and 3.");
158                 }
159                 if (partitions < 1) {
160                         log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
161                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
162                 }
163
164                 // create via kafka
165                 try {
166                         ZkClient zkClient = null;
167                         try {
168                                 log.info("Loading zookeeper client for creating topic.");
169                                 // FIXME: use of this scala module$ thing is a goofy hack to
170                                 // make Kafka aware of the
171                                 // topic creation. (Otherwise, the topic is only partially
172                                 // created in ZK.)
173                                 zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
174                                                 ZKStringSerializer$.MODULE$);
175
176                                 log.info("Zookeeper client loaded successfully. Creating topic.");
177                                 AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties());
178                         } catch (kafka.common.TopicExistsException e) {
179                                 log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e);
180                                 throw new TopicExistsException(topic);
181                         } catch (ZkNoNodeException e) {
182                                 log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e);
183                                 // Kafka throws this when the server isn't running (and perhaps
184                                 // hasn't ever run)
185                                 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
186                                                 "The Kafka cluster is not setup.");
187                         } catch (kafka.admin.AdminOperationException e) {
188                                 // Kafka throws this when the server isn't running (and perhaps
189                                 // hasn't ever run)
190                                 log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(),
191                                                 e);
192                                 throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
193                                                 "The Kafka cluster can't handle your request. Talk to the administrators.");
194                         } finally {
195                                 log.info("Closing zookeeper connection.");
196                                 if (zkClient != null)
197                                         zkClient.close();
198                         }
199
200                         log.info("Creating topic entry for topic: " + topic);
201                         // underlying Kafka topic created. now setup our API info
202                         return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
203                 } catch (ConfigDbException excp) {
204                         log.error("Failed to create topic data. Talk to the administrators: " + excp.getMessage(), excp);
205                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
206                                         "Failed to create topic data. Talk to the administrators.");
207                 }
208         }
209
210         @Override
211         public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException {
212                 log.info("Deleting topic: " + topic);
213                 ZkClient zkClient = null;
214                 try {
215                         log.info("Loading zookeeper client for topic deletion.");
216                         // FIXME: use of this scala module$ thing is a goofy hack to make
217                         // Kafka aware of the
218                         // topic creation. (Otherwise, the topic is only partially created
219                         // in ZK.)
220                         zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
221                                         ZKStringSerializer$.MODULE$);
222
223                         log.info("Zookeeper client loaded successfully. Deleting topic.");
224                         AdminUtils.deleteTopic(zkClient, topic);
225                 } catch (kafka.common.TopicExistsException e) {
226                         log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
227                         throw new TopicExistsException(topic);
228                 } catch (ZkNoNodeException e) {
229                         log.error("Failed to delete topic [" + topic + "]. The Kafka cluster is not setup." + e.getMessage(), e);
230                         // Kafka throws this when the server isn't running (and perhaps
231                         // hasn't ever run)
232                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, "The Kafka cluster is not setup.");
233                 } catch (kafka.admin.AdminOperationException e) {
234                         // Kafka throws this when the server isn't running (and perhaps
235                         // hasn't ever run)
236                         log.error("The Kafka cluster can't handle your request. Talk to the administrators." + e.getMessage(), e);
237                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
238                                         "The Kafka cluster can't handle your request. Talk to the administrators.");
239                 } finally {
240                         log.info("Closing zookeeper connection.");
241                         if (zkClient != null)
242                                 zkClient.close();
243                 }
244
245                 // throw new UnsupportedOperationException ( "We can't programmatically
246                 // delete Kafka topics yet." );
247         }
248
249         //private final rrNvReadable fSettings;
250         private final ZkClient fZk;
251         private final ConfigDb fCambriaConfig;
252         private final ConfigPath fBaseTopicData;
253
254         private static final String zkTopicsRoot = "/brokers/topics";
255         private static final JSONObject kEmptyAcl = new JSONObject();
256
257         /**
258          * method Providing KafkaTopic Object associated with owner and
259          * transactionenabled or not
260          * 
261          * @param name
262          * @param desc
263          * @param owner
264          * @param transactionEnabled
265          * @return
266          * @throws ConfigDbException
267          */
268         public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
269                         throws ConfigDbException {
270                 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
271         }
272
273         /**
274          * static method giving kafka topic object
275          * 
276          * @param db
277          * @param basePath
278          * @param name
279          * @param desc
280          * @param owner
281          * @param transactionEnabled
282          * @return
283          * @throws ConfigDbException
284          */
285         public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
286                         boolean transactionEnabled) throws ConfigDbException {
287                 final JSONObject o = new JSONObject();
288                 o.put("owner", owner);
289                 o.put("description", desc);
290                 o.put("txenabled", transactionEnabled);
291                 db.store(basePath.getChild(name), o.toString());
292                 return new KafkaTopic(name, db, basePath);
293         }
294
295         /**
296          * class performing all user opearation like user is eligible to read,
297          * write. permitting a user to write and read,
298          * 
299          * @author author
300          *
301          */
302         public static class KafkaTopic implements Topic {
303                 /**
304                  * constructor initializes
305                  * 
306                  * @param name
307                  * @param configdb
308                  * @param baseTopic
309                  * @throws ConfigDbException
310                  */
311                 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
312                         fName = name;
313                         fConfigDb = configdb;
314                         fBaseTopicData = baseTopic;
315
316                         String data = fConfigDb.load(fBaseTopicData.getChild(fName));
317                         if (data == null) {
318                                 data = "{}";
319                         }
320
321                         final JSONObject o = new JSONObject(data);
322                         fOwner = o.optString("owner", "");
323                         fDesc = o.optString("description", "");
324                         fTransactionEnabled = o.optBoolean("txenabled", false);// default
325                                                                                                                                         // value is
326                                                                                                                                         // false
327                         // if this topic has an owner, it needs both read/write ACLs. If there's no
328                                                 // owner (or it's empty), null is okay -- this is for existing or implicitly
329                                                 // created topics.
330                                                 JSONObject readers = o.optJSONObject ( "readers" );
331                                                 if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
332                                                 fReaders = fromJson ( readers );
333
334                                                 JSONObject writers = o.optJSONObject ( "writers" );
335                                                 if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
336                                                 fWriters = fromJson ( writers );
337                 }
338                  private NsaAcl fromJson(JSONObject o) {
339                                 NsaAcl acl = new NsaAcl();
340                                 if (o != null) {
341                                         JSONArray a = o.optJSONArray("allowed");
342                                         if (a != null) {
343                                                 for (int i = 0; i < a.length(); ++i) {
344                                                         String user = a.getString(i);
345                                                         acl.add(user);
346                                                 }
347                                         }
348                                 }
349                                 return acl;
350                         }
351                 @Override
352                 public String getName() {
353                         return fName;
354                 }
355
356                 @Override
357                 public String getOwner() {
358                         return fOwner;
359                 }
360
361                 @Override
362                 public String getDescription() {
363                         return fDesc;
364                 }
365
366                 @Override
367                 public NsaAcl getReaderAcl() {
368                         return fReaders;
369                 }
370
371                 @Override
372                 public NsaAcl getWriterAcl() {
373                         return fWriters;
374                 }
375
376                 @Override
377                 public void checkUserRead(NsaApiKey user) throws AccessDeniedException  {
378                         NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
379                 }
380
381                 @Override
382                 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException  {
383                         NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
384                 }
385
386                 @Override
387                 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
388                                 throws ConfigDbException, AccessDeniedException {
389                         updateAcl(asUser, false, true, pubId);
390                 }
391
392                 @Override
393                 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
394                         updateAcl(asUser, false, false, pubId);
395                 }
396
397                 @Override
398                 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
399                                 throws ConfigDbException, AccessDeniedException {
400                         updateAcl(asUser, true, true, consumerId);
401                 }
402
403                 @Override
404                 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
405                                 throws ConfigDbException, AccessDeniedException {
406                         updateAcl(asUser, true, false, consumerId);
407                 }
408
409                 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
410                                 throws ConfigDbException, AccessDeniedException{
411                         try
412                         {
413                                 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
414         
415                                 // we have to assume we have current data, or load it again. for the expected use
416                                 // case, assuming we can overwrite the data is fine.
417                                 final JSONObject o = new JSONObject ();
418                                 o.put ( "owner", fOwner );
419                                 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
420                                 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
421                                 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
422                                 
423                                 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
424         
425                         }
426                         catch ( ConfigDbException x )
427                         {
428                                 throw x;
429                         }
430                         catch ( AccessDeniedException x )
431                         {
432                                 throw x;
433                         }
434                         
435                 }
436
437                 private JSONObject safeSerialize(NsaAcl acl) {
438                         return acl == null ? null : acl.serialize();
439                 }
440
441                 private final String fName;
442                 private final ConfigDb fConfigDb;
443                 private final ConfigPath fBaseTopicData;
444                 private final String fOwner;
445                 private final String fDesc;
446                 private final NsaAcl fReaders;
447                 private final NsaAcl fWriters;
448                 private boolean fTransactionEnabled;
449
450                 public boolean isTransactionEnabled() {
451                         return fTransactionEnabled;
452                 }
453
454                 @Override
455                 public Set<String> getOwners() {
456                         final TreeSet<String> owners = new TreeSet<String> ();
457                         owners.add ( fOwner );
458                         return owners;
459                 }
460         }
461
462 }