a4ae2be833099af0a47ac104fc2d3883b90dfef3
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import java.util.Arrays;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Properties;
28 import java.util.Set;
29 import java.util.TreeSet;
30 import java.util.concurrent.ExecutionException;
31
32 import org.I0Itec.zkclient.ZkClient;
33 import org.I0Itec.zkclient.exception.ZkNoNodeException;
34 import org.apache.kafka.clients.admin.AdminClient;
35 import org.apache.kafka.clients.admin.AdminClientConfig;
36 import org.apache.kafka.clients.admin.CreateTopicsResult;
37 import org.apache.kafka.clients.admin.NewTopic;
38 import org.apache.kafka.common.KafkaFuture;
39 import org.json.JSONObject;
40 import org.json.JSONArray;
41 import org.springframework.beans.factory.annotation.Qualifier;
42 import org.springframework.stereotype.Component;
43
44 import org.onap.dmaap.dmf.mr.CambriaApiException;
45 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
46 import org.onap.dmaap.dmf.mr.metabroker.Broker;
47 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
48 import org.onap.dmaap.dmf.mr.metabroker.Topic;
49 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
50 import org.onap.dmaap.dmf.mr.utils.Utils;
51 //import org.apache.log4-j.Logger;
52 import com.att.eelf.configuration.EELFLogger;
53 import com.att.eelf.configuration.EELFManager;
54
55 import com.att.nsa.configs.ConfigDb;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.configs.ConfigPath;
58 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
59 import com.att.nsa.drumlin.till.nv.rrNvReadable;
60 import com.att.nsa.security.NsaAcl;
61 import com.att.nsa.security.NsaAclUtils;
62 import com.att.nsa.security.NsaApiKey;
63
64
65 /**
66  * class performing all topic operations
67  * 
68  * @author anowarul.islam
69  *
70  */
71 //@Component
72 public class DMaaPKafkaMetaBroker implements Broker1 {
73
74         public DMaaPKafkaMetaBroker() {
75                 fZk = null;
76                 fCambriaConfig = null;
77                 fBaseTopicData = null;
78                 final Properties props = new Properties ();
79                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
80                                 "kafka.metadata.broker.list");
81                 if (null == fkafkaBrokers) {
82
83                         fkafkaBrokers = "localhost:9092";
84                 }
85                 
86              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
87              if(Utils.isCadiEnabled()){
88              props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
89                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
90              props.put("sasl.mechanism", "PLAIN");
91              }
92            
93              fKafkaAdminClient=AdminClient.create ( props );
94             
95         }
96
97         //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
98         private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
99         private final AdminClient fKafkaAdminClient;
100         
101         
102
103         /**
104          * DMaaPKafkaMetaBroker constructor initializing
105          * 
106          * @param settings
107          * @param zk
108          * @param configDb
109          */
110         public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
111                         @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
112                 //fSettings = settings;
113                 fZk = zk;
114                 fCambriaConfig = configDb;
115                 fBaseTopicData = configDb.parse("/topics");
116                 final Properties props = new Properties ();
117                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
118                                 "kafka.metadata.broker.list");
119                 if (null == fkafkaBrokers) {
120
121                         fkafkaBrokers = "localhost:9092";
122                 }
123                 
124                  if(Utils.isCadiEnabled()){
125                  props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
126                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
127              props.put("sasl.mechanism", "PLAIN");
128                  }
129              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
130              
131              fKafkaAdminClient=AdminClient.create ( props );
132             
133                 
134                 
135         }
136         
137         public DMaaPKafkaMetaBroker( rrNvReadable settings,
138                         ZkClient zk,  ConfigDb configDb,AdminClient client) {
139                 
140                 fZk = zk;
141                 fCambriaConfig = configDb;
142                 fBaseTopicData = configDb.parse("/topics");
143             fKafkaAdminClient= client;
144            
145                 
146                 
147         }
148
149         @Override
150         public List<Topic> getAllTopics() throws ConfigDbException {
151                 log.info("Retrieving list of all the topics.");
152                 final LinkedList<Topic> result = new LinkedList<Topic>();
153                 try {
154                         log.info("Retrieving all topics from root: " + zkTopicsRoot);
155                         final List<String> topics = fZk.getChildren(zkTopicsRoot);
156                         for (String topic : topics) {
157                                 result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
158                         }
159                         JSONObject dataObj = new JSONObject();
160                         dataObj.put("topics", new JSONObject());
161
162                         for (String topic : topics) {
163                                 dataObj.getJSONObject("topics").put(topic, new JSONObject());
164                         }
165                 } catch (ZkNoNodeException excp) {
166                         // very fresh kafka doesn't have any topics or a topics node
167                         log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
168                 }
169                 return result;
170         }
171
172         @Override
173         public Topic getTopic(String topic) throws ConfigDbException {
174                 if (fZk.exists(zkTopicsRoot + "/" + topic)) {
175                         return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
176                 }
177                 // else: no such topic in kafka
178                 return null;
179         }
180
181         /**
182          * static method get KafkaTopic object
183          * 
184          * @param db
185          * @param base
186          * @param topic
187          * @return
188          * @throws ConfigDbException
189          */
190         public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
191                 return new KafkaTopic(topic, db, base);
192         }
193
194         /**
195          * creating topic
196          */
197         @Override
198         public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
199                         boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
200                 log.info("Creating topic: " + topic);
201                 try {
202                         log.info("Check if topic [" + topic + "] exist.");
203                         // first check for existence "our way"
204                         final Topic t = getTopic(topic);
205                         if (t != null) {
206                                 log.info("Could not create topic [" + topic + "]. Topic Already exists.");
207                                 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
208                         }
209                 } catch (ConfigDbException e1) {
210                         log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
211                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
212                                         "Couldn't check topic data in config db.");
213                 }
214
215                 // we only allow 3 replicas. (If we don't test this, we get weird
216                 // results from the cluster,
217                 // so explicit test and fail.)
218                 if (replicas < 1 || replicas > 3) {
219                         log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
220                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
221                                         "The replica count must be between 1 and 3.");
222                 }
223                 if (partitions < 1) {
224                         log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
225                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
226                 }
227
228                 // create via kafka
229                 
230                         try
231                         {
232                                 final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
233                                 final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
234                                 final KafkaFuture<Void> ctrResult = ctr.all ();
235                                 ctrResult.get ();
236                                 // underlying Kafka topic created. now setup our API info
237                                 return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
238                         }
239                         catch ( InterruptedException e )
240                         {
241                                 
242                                 log.warn ( "Execution of describeTopics timed out." );
243                                 throw new ConfigDbException ( e );
244                         }
245                         catch ( ExecutionException e )
246                         {
247                                 
248                                 log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
249                                 throw new ConfigDbException ( e.getCause () );
250                         }
251                 
252         }
253
254         @Override
255         public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
256                 log.info("Deleting topic: " + topic);
257                 ZkClient zkClient = null;
258                 try {
259                         log.info("Loading zookeeper client for topic deletion.");
260                                         // topic creation. (Otherwise, the topic is only partially created
261                         // in ZK.)
262                         
263                         
264                         fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
265                         log.info("Zookeeper client loaded successfully. Deleting topic.");
266                         
267                 } catch (Exception e) {
268                         log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
269                         throw new ConfigDbException(e);
270                 }  finally {
271                         log.info("Closing zookeeper connection.");
272                         if (zkClient != null)
273                                 zkClient.close();
274                 }
275
276                 // throw new UnsupportedOperationException ( "We can't programmatically
277                 // delete Kafka topics yet." );
278         }
279
280         //private final rrNvReadable fSettings;
281         private final ZkClient fZk;
282         private final ConfigDb fCambriaConfig;
283         private final ConfigPath fBaseTopicData;
284
285         private static final String zkTopicsRoot = "/brokers/topics";
286         private static final JSONObject kEmptyAcl = new JSONObject();
287
288         /**
289          * method Providing KafkaTopic Object associated with owner and
290          * transactionenabled or not
291          * 
292          * @param name
293          * @param desc
294          * @param owner
295          * @param transactionEnabled
296          * @return
297          * @throws ConfigDbException
298          */
299         public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
300                         throws ConfigDbException {
301                 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
302         }
303
304         /**
305          * static method giving kafka topic object
306          * 
307          * @param db
308          * @param basePath
309          * @param name
310          * @param desc
311          * @param owner
312          * @param transactionEnabled
313          * @return
314          * @throws ConfigDbException
315          */
316         public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
317                         boolean transactionEnabled) throws ConfigDbException {
318                 final JSONObject o = new JSONObject();
319                 o.put("owner", owner);
320                 o.put("description", desc);
321                 o.put("txenabled", transactionEnabled);
322                 db.store(basePath.getChild(name), o.toString());
323                 return new KafkaTopic(name, db, basePath);
324         }
325
326         /**
327          * class performing all user opearation like user is eligible to read,
328          * write. permitting a user to write and read,
329          * 
330          * @author anowarul.islam
331          *
332          */
333         public static class KafkaTopic implements Topic {
334                 /**
335                  * constructor initializes
336                  * 
337                  * @param name
338                  * @param configdb
339                  * @param baseTopic
340                  * @throws ConfigDbException
341                  */
342                 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
343                         fName = name;
344                         fConfigDb = configdb;
345                         fBaseTopicData = baseTopic;
346
347                         String data = fConfigDb.load(fBaseTopicData.getChild(fName));
348                         if (data == null) {
349                                 data = "{}";
350                         }
351
352                         final JSONObject o = new JSONObject(data);
353                         fOwner = o.optString("owner", "");
354                         fDesc = o.optString("description", "");
355                         fTransactionEnabled = o.optBoolean("txenabled", false);// default
356                                                                                                                                         // value is
357                                                                                                                                         // false
358                         // if this topic has an owner, it needs both read/write ACLs. If there's no
359                                                 // owner (or it's empty), null is okay -- this is for existing or implicitly
360                                                 // created topics.
361                                                 JSONObject readers = o.optJSONObject ( "readers" );
362                                                 if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
363                                                 fReaders =  fromJson ( readers );
364
365                                                 JSONObject writers = o.optJSONObject ( "writers" );
366                                                 if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
367                                                 fWriters = fromJson ( writers );
368                 }
369                 
370                 private NsaAcl fromJson(JSONObject o) {
371                         NsaAcl acl = new NsaAcl();
372                         if (o != null) {
373                                 JSONArray a = o.optJSONArray("allowed");
374                                 if (a != null) {
375                                         for (int i = 0; i < a.length(); ++i) {
376                                                 String user = a.getString(i);
377                                                 acl.add(user);
378                                         }
379                                 }
380                         }
381                         return acl;
382                 }
383
384                 @Override
385                 public String getName() {
386                         return fName;
387                 }
388
389                 @Override
390                 public String getOwner() {
391                         return fOwner;
392                 }
393
394                 @Override
395                 public String getDescription() {
396                         return fDesc;
397                 }
398
399                 @Override
400                 public NsaAcl getReaderAcl() {
401                         return fReaders;
402                 }
403
404                 @Override
405                 public NsaAcl getWriterAcl() {
406                         return fWriters;
407                 }
408
409                 @Override
410                 public void checkUserRead(NsaApiKey user) throws AccessDeniedException  {
411                         NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
412                 }
413
414                 @Override
415                 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException  {
416                         NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
417                 }
418
419                 @Override
420                 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
421                                 throws ConfigDbException, AccessDeniedException {
422                         updateAcl(asUser, false, true, pubId);
423                 }
424
425                 @Override
426                 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
427                         updateAcl(asUser, false, false, pubId);
428                 }
429
430                 @Override
431                 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
432                                 throws ConfigDbException, AccessDeniedException {
433                         updateAcl(asUser, true, true, consumerId);
434                 }
435
436                 @Override
437                 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
438                                 throws ConfigDbException, AccessDeniedException {
439                         updateAcl(asUser, true, false, consumerId);
440                 }
441
442                 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
443                                 throws ConfigDbException, AccessDeniedException{
444                         try
445                         {
446                                 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
447         
448                                 // we have to assume we have current data, or load it again. for the expected use
449                                 // case, assuming we can overwrite the data is fine.
450                                 final JSONObject o = new JSONObject ();
451                                 o.put ( "owner", fOwner );
452                                 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
453                                 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
454                                 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
455                                 
456                                 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
457         
458                         }
459                         catch ( ConfigDbException x )
460                         {
461                                 throw x;
462                         }
463                         catch ( AccessDeniedException x )
464                         {
465                                 throw x;
466                         }
467                         
468                 }
469
470                 private JSONObject safeSerialize(NsaAcl acl) {
471                         return acl == null ? null : acl.serialize();
472                 }
473
474                 private final String fName;
475                 private final ConfigDb fConfigDb;
476                 private final ConfigPath fBaseTopicData;
477                 private final String fOwner;
478                 private final String fDesc;
479                 private final NsaAcl fReaders;
480                 private final NsaAcl fWriters;
481                 private boolean fTransactionEnabled;
482         
483                 public boolean isTransactionEnabled() {
484                         return fTransactionEnabled;
485                 }
486
487                 @Override
488                 public Set<String> getOwners() {
489                         final TreeSet<String> owners = new TreeSet<String> ();
490                         owners.add ( fOwner );
491                         return owners;
492                 }
493         }
494
495 }