[DMAAP-MR] Get topics from kafka option
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import com.att.eelf.configuration.EELFLogger;
25 import com.att.eelf.configuration.EELFManager;
26 import com.att.nsa.configs.ConfigDb;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.configs.ConfigPath;
29 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
30 import com.att.nsa.drumlin.till.nv.rrNvReadable;
31 import com.att.nsa.security.NsaAcl;
32 import com.att.nsa.security.NsaAclUtils;
33 import com.att.nsa.security.NsaApiKey;
34 import org.I0Itec.zkclient.ZkClient;
35 import org.I0Itec.zkclient.exception.ZkNoNodeException;
36 import org.apache.kafka.clients.admin.AdminClient;
37 import org.apache.kafka.clients.admin.AdminClientConfig;
38 import org.apache.kafka.clients.admin.CreateTopicsResult;
39 import org.apache.kafka.clients.admin.ListTopicsResult;
40 import org.apache.kafka.clients.admin.NewTopic;
41 import org.apache.kafka.common.KafkaFuture;
42 import org.json.JSONArray;
43 import org.json.JSONObject;
44 import org.onap.dmaap.dmf.mr.CambriaApiException;
45 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
46 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
47 import org.onap.dmaap.dmf.mr.metabroker.Topic;
48 import org.onap.dmaap.dmf.mr.utils.Utils;
49 import org.springframework.beans.factory.annotation.Qualifier;
50 import org.springframework.util.StringUtils;
51
52 import java.util.*;
53 import java.util.concurrent.ExecutionException;
54
55
56 /**
57  * class performing all topic operations
58  *
59  * @author anowarul.islam
60  *
61  */
62 //@Component
63 public class DMaaPKafkaMetaBroker implements Broker1 {
64
65         private static final EELFLogger log = EELFManager.getLogger(DMaaPKafkaMetaBroker.class);
66         private final AdminClient fKafkaAdminClient;
67         private static final boolean GET_TOPICS_FROM_ZK = Boolean.parseBoolean(System.getenv().
68                 getOrDefault("useZkTopicStore", "true"));
69         private final ZkClient fZk;
70         private final ConfigDb fCambriaConfig;
71         private final ConfigPath fBaseTopicData;
72         private static final String ZK_TOPICS_ROOT = "/brokers/topics";
73         private static final JSONObject kEmptyAcl = new JSONObject();
74
75         public DMaaPKafkaMetaBroker() {
76                 fZk = null;
77                 fCambriaConfig = null;
78                 fBaseTopicData = null;
79                 final Properties props = new Properties ();
80                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
81                         "kafka.metadata.broker.list");
82
83                 if (StringUtils.isEmpty(fkafkaBrokers)) {
84                         fkafkaBrokers = "localhost:9092";
85                 }
86                 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
87
88                 if(Utils.isCadiEnabled()){
89                         props.putAll(Utils.addSaslProps());
90                 }
91                 fKafkaAdminClient=AdminClient.create ( props );
92         }
93
94         /**
95          * DMaaPKafkaMetaBroker constructor initializing
96          *
97          * @param settings
98          * @param zk
99          * @param configDb
100          */
101         public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
102                 @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
103                 fZk = zk;
104                 fCambriaConfig = configDb;
105                 fBaseTopicData = configDb.parse("/topics");
106                 final Properties props = new Properties ();
107                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
108                         "kafka.metadata.broker.list");
109
110                 if (null == fkafkaBrokers) {
111                         fkafkaBrokers = "localhost:9092";
112                 }
113                 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
114
115                 if(Utils.isCadiEnabled()){
116                         props.putAll(Utils.addSaslProps());
117                 }
118                 fKafkaAdminClient=AdminClient.create ( props );
119         }
120
121         public DMaaPKafkaMetaBroker(ZkClient zk,  ConfigDb configDb,AdminClient client) {
122                 fZk = zk;
123                 fCambriaConfig = configDb;
124                 fBaseTopicData = configDb.parse("/topics");
125                 fKafkaAdminClient= client;
126         }
127
128         @Override
129         public List<Topic> getAllTopics() throws ConfigDbException {
130                 log.info("Retrieving list of all the topics.");
131                 if (!GET_TOPICS_FROM_ZK) {
132                         return getTopicsFromKafka();
133                 }
134                 return getTopicsFromZookeeper();
135         }
136
137         private LinkedList<Topic> getTopicsFromKafka() throws ConfigDbException {
138                 LinkedList<Topic> res = new LinkedList<>();
139                 final ListTopicsResult ltr = fKafkaAdminClient.listTopics();
140                 try {
141                         for (String name: ltr.names().get()) {
142                                 res.add(new KafkaTopic(name, fCambriaConfig, fBaseTopicData));
143                         }
144                 } catch (InterruptedException | ExecutionException e) {
145                         log.error("GetAllTopicsFromKafka: Failed to retrieve topic list from kafka.", e);
146                 }
147                 return res;
148         }
149
150         private LinkedList<Topic> getTopicsFromZookeeper() throws ConfigDbException {
151                 final LinkedList<Topic> legacyResult = new LinkedList<>();
152                 try {
153                         log.info("Retrieving all topics from root: " + ZK_TOPICS_ROOT);
154                         final List<String> topics = fZk.getChildren(ZK_TOPICS_ROOT);
155                         for (String topic : topics) {
156                                 legacyResult.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
157                         }
158                         JSONObject dataObj = new JSONObject();
159                         dataObj.put("topics", new JSONObject());
160
161                         for (String topic : topics) {
162                                 dataObj.getJSONObject("topics").put(topic, new JSONObject());
163                         }
164                 } catch (ZkNoNodeException excp) {
165                         // very fresh kafka doesn't have any topics or a topics node
166                         log.error("ZK doesn't have a Kakfa topics node at " + ZK_TOPICS_ROOT, excp);
167                 }
168                 return legacyResult;
169         }
170
171         @Override
172         public Topic getTopic(String topic) throws ConfigDbException {
173                 if (!GET_TOPICS_FROM_ZK) {
174                         try {
175                                 for (String name : fKafkaAdminClient.listTopics().names().get()) {
176                                         if (name.equals(topic)) {
177                                                 log.debug("TOPIC_NAME: {} is equal to : {}", name, topic);
178                                                 return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
179                                         }
180                                 }
181                         } catch (InterruptedException | ExecutionException e) {
182                                 log.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e);
183                                 return null;
184                         }
185                 } else if (fZk.exists(ZK_TOPICS_ROOT + "/" + topic)) {
186                         return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
187                 }
188                 // else: no such topic
189                 return null;
190         }
191
192         /**
193          * static method get KafkaTopic object
194          *
195          * @param db
196          * @param base
197          * @param topic
198          * @return
199          * @throws ConfigDbException
200          */
201         public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
202                 return new KafkaTopic(topic, db, base);
203         }
204
205         /**
206          * creating topic
207          */
208         @Override
209         public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
210                 boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
211                 log.info("Creating topic: {}", topic);
212                 try {
213                         log.info("Check if topic [{}] exist.", topic);
214                         // first check for existence "our way"
215                         final Topic t = getTopic(topic);
216                         if (t != null) {
217                                 log.info("Could not create topic [{}]. Topic Already exists.", topic);
218                                 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
219                         }
220                 } catch (ConfigDbException e1) {
221                         log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
222                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
223                                 "Couldn't check topic data in config db.");
224                 }
225
226                 // we only allow 3 replicas. (If we don't test this, we get weird
227                 // results from the cluster,
228                 // so explicit test and fail.)
229                 if (replicas < 1 || replicas > 3) {
230                         log.info("Topic [{}] could not be created. The replica count must be between 1 and 3.", topic);
231                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
232                                 "The replica count must be between 1 and 3.");
233                 }
234                 if (partitions < 1) {
235                         log.info("Topic [{}] could not be created. The partition count must be at least 1.", topic);
236                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
237                 }
238                 // create via kafka
239                 try {
240                         final NewTopic topicRequest = new NewTopic(topic, partitions, (short)replicas);
241                         final CreateTopicsResult ctr = fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
242                         final KafkaFuture<Void> ctrResult = ctr.all();
243                         ctrResult.get();
244                         // underlying Kafka topic created. now setup our API info
245                         return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
246                 } catch (InterruptedException e) {
247                         log.warn("Execution of describeTopics timed out.");
248                         throw new ConfigDbException(e);
249                 } catch (ExecutionException e) {
250                         log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
251                         throw new ConfigDbException(e.getCause());
252                 }
253
254         }
255
256         @Override
257         public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
258                 log.info("Deleting topic: {}", topic);
259                 try {
260                         log.info("Loading zookeeper client for topic deletion.");
261                         // topic creation. (Otherwise, the topic is only partially created
262                         // in ZK.)
263                         fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
264                         log.info("Zookeeper client loaded successfully. Deleting topic.");
265                 } catch (Exception e) {
266                         log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
267                         throw new ConfigDbException(e);
268                 }  finally {
269                         log.info("Closing zookeeper connection.");
270                 }
271         }
272
273         /**
274          * method Providing KafkaTopic Object associated with owner and
275          * transactionenabled or not
276          *
277          * @param name
278          * @param desc
279          * @param owner
280          * @param transactionEnabled
281          * @return
282          * @throws ConfigDbException
283          */
284         public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
285                 throws ConfigDbException {
286                 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
287         }
288
289         /**
290          * static method giving kafka topic object
291          *
292          * @param db
293          * @param basePath
294          * @param name
295          * @param desc
296          * @param owner
297          * @param transactionEnabled
298          * @return
299          * @throws ConfigDbException
300          */
301         public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
302                 boolean transactionEnabled) throws ConfigDbException {
303                 final JSONObject o = new JSONObject();
304                 o.put("owner", owner);
305                 o.put("description", desc);
306                 o.put("txenabled", transactionEnabled);
307                 if (GET_TOPICS_FROM_ZK) {
308                         db.store(basePath.getChild(name), o.toString());
309                 }
310                 return new KafkaTopic(name, db, basePath);
311         }
312
313         /**
314          * class performing all user operation like user is eligible to read,
315          * write. permitting a user to write and read etc
316          *
317          * @author anowarul.islam
318          *
319          */
320         public static class KafkaTopic implements Topic {
321                 /**
322                  * constructor initializes
323                  *
324                  * @param name
325                  * @param configdb
326                  * @param baseTopic
327                  * @throws ConfigDbException
328                  */
329
330                 private final String fName;
331                 private final ConfigDb fConfigDb;
332                 private final ConfigPath fBaseTopicData;
333                 private final String fOwner;
334                 private final String fDesc;
335                 private final NsaAcl fReaders;
336                 private final NsaAcl fWriters;
337                 private final boolean fTransactionEnabled;
338
339                 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
340                         fName = name;
341                         fConfigDb = configdb;
342                         fBaseTopicData = baseTopic;
343
344                         String data = fConfigDb.load(fBaseTopicData.getChild(fName));
345                         if (data == null) {
346                                 data = "{}";
347                         }
348
349                         final JSONObject o = new JSONObject(data);
350                         fOwner = o.optString("owner", "");
351                         fDesc = o.optString("description", "");
352                         fTransactionEnabled = o.optBoolean("txenabled", false);// default
353                         // value is
354                         // false
355                         // if this topic has an owner, it needs both read/write ACLs. If there's no
356                         // owner (or it's empty), null is okay -- this is for existing or implicitly
357                         // created topics.
358                         JSONObject readers = o.optJSONObject ( "readers" );
359                         if ( readers == null && fOwner.length () > 0 )
360                         {
361                                 readers = kEmptyAcl;
362                         }
363                         fReaders =  fromJson ( readers );
364
365                         JSONObject writers = o.optJSONObject ( "writers" );
366                         if ( writers == null && fOwner.length () > 0 )
367                         {
368                                 writers = kEmptyAcl;
369                         }
370                         fWriters = fromJson ( writers );
371                 }
372
373                 private NsaAcl fromJson(JSONObject o) {
374                         NsaAcl acl = new NsaAcl();
375                         if (o != null) {
376                                 JSONArray a = o.optJSONArray("allowed");
377                                 if (a != null) {
378                                         for (int i = 0; i < a.length(); ++i) {
379                                                 String user = a.getString(i);
380                                                 acl.add(user);
381                                         }
382                                 }
383                         }
384                         return acl;
385                 }
386
387                 @Override
388                 public String getName() {
389                         return fName;
390                 }
391
392                 @Override
393                 public String getOwner() {
394                         return fOwner;
395                 }
396
397                 @Override
398                 public String getDescription() {
399                         return fDesc;
400                 }
401
402                 @Override
403                 public NsaAcl getReaderAcl() {
404                         return fReaders;
405                 }
406
407                 @Override
408                 public NsaAcl getWriterAcl() {
409                         return fWriters;
410                 }
411
412                 @Override
413                 public void checkUserRead(NsaApiKey user) throws AccessDeniedException  {
414                         NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
415                 }
416
417                 @Override
418                 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException  {
419                         NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
420                 }
421
422                 @Override
423                 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
424                         throws ConfigDbException, AccessDeniedException {
425                         updateAcl(asUser, false, true, pubId);
426                 }
427
428                 @Override
429                 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
430                         updateAcl(asUser, false, false, pubId);
431                 }
432
433                 @Override
434                 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
435                         throws ConfigDbException, AccessDeniedException {
436                         updateAcl(asUser, true, true, consumerId);
437                 }
438
439                 @Override
440                 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
441                         throws ConfigDbException, AccessDeniedException {
442                         updateAcl(asUser, true, false, consumerId);
443                 }
444
445                 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
446                         throws ConfigDbException, AccessDeniedException{
447                         try {
448                                 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
449                                 // we have to assume we have current data, or load it again. for the expected use
450                                 // case, assuming we can overwrite the data is fine.
451                                 final JSONObject o = new JSONObject ();
452                                 o.put ( "owner", fOwner );
453                                 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
454                                 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
455                                 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
456
457                                 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
458                         } catch ( ConfigDbException | AccessDeniedException x ) {
459                                 log.info("Error when trying to update acl for key {}", key);
460                                 throw x;
461                         }
462
463                 }
464
465                 private JSONObject safeSerialize(NsaAcl acl) {
466                         return acl == null ? null : acl.serialize();
467                 }
468
469                 public boolean isTransactionEnabled() {
470                         return fTransactionEnabled;
471                 }
472
473                 @Override
474                 public Set<String> getOwners() {
475                         final TreeSet<String> owners = new TreeSet<>();
476                         owners.add ( fOwner );
477                         return owners;
478                 }
479         }
480
481 }