643eae9e269e0dca154fdea316e9b3a25e678e4c
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / beans / DMaaPKafkaMetaBroker.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.beans;
23
24 import java.util.Arrays;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Properties;
28 import java.util.Set;
29 import java.util.TreeSet;
30 import java.util.concurrent.ExecutionException;
31
32 import org.I0Itec.zkclient.ZkClient;
33 import org.I0Itec.zkclient.exception.ZkNoNodeException;
34 import org.apache.kafka.clients.admin.AdminClient;
35 import org.apache.kafka.clients.admin.AdminClientConfig;
36 import org.apache.kafka.clients.admin.CreateTopicsResult;
37 import org.apache.kafka.clients.admin.NewTopic;
38 import org.apache.kafka.common.KafkaFuture;
39 import org.json.JSONObject;
40 import org.json.JSONArray;
41 import org.springframework.beans.factory.annotation.Qualifier;
42 import org.springframework.stereotype.Component;
43
44 import com.att.dmf.mr.CambriaApiException;
45 import com.att.dmf.mr.constants.CambriaConstants;
46 import com.att.dmf.mr.metabroker.Broker;
47 import com.att.dmf.mr.metabroker.Broker1;
48 import com.att.dmf.mr.metabroker.Topic;
49 import com.att.dmf.mr.utils.ConfigurationReader;
50 //import org.apache.log4-j.Logger;
51 import com.att.eelf.configuration.EELFLogger;
52 import com.att.eelf.configuration.EELFManager;
53 //import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
54 import com.att.nsa.configs.ConfigDb;
55 import com.att.nsa.configs.ConfigDbException;
56 import com.att.nsa.configs.ConfigPath;
57 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
58 import com.att.nsa.drumlin.till.nv.rrNvReadable;
59 import com.att.nsa.security.NsaAcl;
60 import com.att.nsa.security.NsaAclUtils;
61 import com.att.nsa.security.NsaApiKey;
62
63
64 /**
65  * class performing all topic operations
66  * 
67  * @author anowarul.islam
68  *
69  */
70 //@Component
71 public class DMaaPKafkaMetaBroker implements Broker1 {
72
73         public DMaaPKafkaMetaBroker() {
74                 fZk = null;
75                 fCambriaConfig = null;
76                 fBaseTopicData = null;
77                 final Properties props = new Properties ();
78                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
79                                 "kafka.metadata.broker.list");
80                 if (null == fkafkaBrokers) {
81
82                         fkafkaBrokers = "localhost:9092";
83                 }
84                 
85                 
86                 
87              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
88             /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
89                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
90              props.put("sasl.mechanism", "PLAIN");*/
91              fKafkaAdminClient=AdminClient.create ( props );
92             // fKafkaAdminClient = null;
93         }
94
95         //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
96         private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
97         private final AdminClient fKafkaAdminClient;
98         
99         
100
101         /**
102          * DMaaPKafkaMetaBroker constructor initializing
103          * 
104          * @param settings
105          * @param zk
106          * @param configDb
107          */
108         public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
109                         @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
110                 //fSettings = settings;
111                 fZk = zk;
112                 fCambriaConfig = configDb;
113                 fBaseTopicData = configDb.parse("/topics");
114                 final Properties props = new Properties ();
115                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
116                                 "kafka.metadata.broker.list");
117                 if (null == fkafkaBrokers) {
118
119                         fkafkaBrokers = "localhost:9092";
120                 }
121                 
122                 
123                 
124              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
125              /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
126                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
127              props.put("sasl.mechanism", "PLAIN");*/
128              fKafkaAdminClient=AdminClient.create ( props );
129             // fKafkaAdminClient = null;
130                 
131                 
132         }
133         
134         public DMaaPKafkaMetaBroker( rrNvReadable settings,
135                         ZkClient zk,  ConfigDb configDb,AdminClient client) {
136                 //fSettings = settings;
137                 fZk = zk;
138                 fCambriaConfig = configDb;
139                 fBaseTopicData = configDb.parse("/topics");
140             fKafkaAdminClient= client;
141             // fKafkaAdminClient = null;
142                 
143                 
144         }
145
146         @Override
147         public List<Topic> getAllTopics() throws ConfigDbException {
148                 log.info("Retrieving list of all the topics.");
149                 final LinkedList<Topic> result = new LinkedList<Topic>();
150                 try {
151                         log.info("Retrieving all topics from root: " + zkTopicsRoot);
152                         final List<String> topics = fZk.getChildren(zkTopicsRoot);
153                         for (String topic : topics) {
154                                 result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
155                         }
156                         JSONObject dataObj = new JSONObject();
157                         dataObj.put("topics", new JSONObject());
158
159                         for (String topic : topics) {
160                                 dataObj.getJSONObject("topics").put(topic, new JSONObject());
161                         }
162                 } catch (ZkNoNodeException excp) {
163                         // very fresh kafka doesn't have any topics or a topics node
164                         log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
165                 }
166                 return result;
167         }
168
169         @Override
170         public Topic getTopic(String topic) throws ConfigDbException {
171                 if (fZk.exists(zkTopicsRoot + "/" + topic)) {
172                         return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
173                 }
174                 // else: no such topic in kafka
175                 return null;
176         }
177
178         /**
179          * static method get KafkaTopic object
180          * 
181          * @param db
182          * @param base
183          * @param topic
184          * @return
185          * @throws ConfigDbException
186          */
187         public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
188                 return new KafkaTopic(topic, db, base);
189         }
190
191         /**
192          * creating topic
193          */
194         @Override
195         public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
196                         boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
197                 log.info("Creating topic: " + topic);
198                 try {
199                         log.info("Check if topic [" + topic + "] exist.");
200                         // first check for existence "our way"
201                         final Topic t = getTopic(topic);
202                         if (t != null) {
203                                 log.info("Could not create topic [" + topic + "]. Topic Already exists.");
204                                 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
205                         }
206                 } catch (ConfigDbException e1) {
207                         log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
208                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
209                                         "Couldn't check topic data in config db.");
210                 }
211
212                 // we only allow 3 replicas. (If we don't test this, we get weird
213                 // results from the cluster,
214                 // so explicit test and fail.)
215                 if (replicas < 1 || replicas > 3) {
216                         log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
217                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
218                                         "The replica count must be between 1 and 3.");
219                 }
220                 if (partitions < 1) {
221                         log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
222                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
223                 }
224
225                 // create via kafka
226                 
227                         try
228                         {
229                                 final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
230                                 final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
231                                 final KafkaFuture<Void> ctrResult = ctr.all ();
232                                 ctrResult.get ();
233                                 // underlying Kafka topic created. now setup our API info
234                                 return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
235                         }
236                         catch ( InterruptedException e )
237                         {
238                                 //timer.fail ( "Timeout" );
239                                 log.warn ( "Execution of describeTopics timed out." );
240                                 throw new ConfigDbException ( e );
241                         }
242                         catch ( ExecutionException e )
243                         {
244                                 //timer.fail ( "ExecutionError" );
245                                 log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
246                                 throw new ConfigDbException ( e.getCause () );
247                         }
248                 
249         }
250
251         @Override
252         public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
253                 log.info("Deleting topic: " + topic);
254                 ZkClient zkClient = null;
255                 try {
256                         log.info("Loading zookeeper client for topic deletion.");
257                                         // topic creation. (Otherwise, the topic is only partially created
258                         // in ZK.)
259                         /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
260                                         ZKStringSerializer$.MODULE$);
261                         String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
262                         if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
263                         ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
264                         */
265                         
266                         fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
267                         log.info("Zookeeper client loaded successfully. Deleting topic.");
268                         //AdminUtils.deleteTopic(zkutils, topic);
269                 } catch (Exception e) {
270                         log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
271                         throw new ConfigDbException(e);
272                 }  finally {
273                         log.info("Closing zookeeper connection.");
274                         if (zkClient != null)
275                                 zkClient.close();
276                 }
277
278                 // throw new UnsupportedOperationException ( "We can't programmatically
279                 // delete Kafka topics yet." );
280         }
281
282         //private final rrNvReadable fSettings;
283         private final ZkClient fZk;
284         private final ConfigDb fCambriaConfig;
285         private final ConfigPath fBaseTopicData;
286
287         private static final String zkTopicsRoot = "/brokers/topics";
288         private static final JSONObject kEmptyAcl = new JSONObject();
289
290         /**
291          * method Providing KafkaTopic Object associated with owner and
292          * transactionenabled or not
293          * 
294          * @param name
295          * @param desc
296          * @param owner
297          * @param transactionEnabled
298          * @return
299          * @throws ConfigDbException
300          */
301         public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
302                         throws ConfigDbException {
303                 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
304         }
305
306         /**
307          * static method giving kafka topic object
308          * 
309          * @param db
310          * @param basePath
311          * @param name
312          * @param desc
313          * @param owner
314          * @param transactionEnabled
315          * @return
316          * @throws ConfigDbException
317          */
318         public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
319                         boolean transactionEnabled) throws ConfigDbException {
320                 final JSONObject o = new JSONObject();
321                 o.put("owner", owner);
322                 o.put("description", desc);
323                 o.put("txenabled", transactionEnabled);
324                 db.store(basePath.getChild(name), o.toString());
325                 return new KafkaTopic(name, db, basePath);
326         }
327
328         /**
329          * class performing all user opearation like user is eligible to read,
330          * write. permitting a user to write and read,
331          * 
332          * @author anowarul.islam
333          *
334          */
335         public static class KafkaTopic implements Topic {
336                 /**
337                  * constructor initializes
338                  * 
339                  * @param name
340                  * @param configdb
341                  * @param baseTopic
342                  * @throws ConfigDbException
343                  */
344                 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
345                         fName = name;
346                         fConfigDb = configdb;
347                         fBaseTopicData = baseTopic;
348
349                         String data = fConfigDb.load(fBaseTopicData.getChild(fName));
350                         if (data == null) {
351                                 data = "{}";
352                         }
353
354                         final JSONObject o = new JSONObject(data);
355                         fOwner = o.optString("owner", "");
356                         fDesc = o.optString("description", "");
357                         fTransactionEnabled = o.optBoolean("txenabled", false);// default
358                                                                                                                                         // value is
359                                                                                                                                         // false
360                         // if this topic has an owner, it needs both read/write ACLs. If there's no
361                                                 // owner (or it's empty), null is okay -- this is for existing or implicitly
362                                                 // created topics.
363                                                 JSONObject readers = o.optJSONObject ( "readers" );
364                                                 if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
365                                                 fReaders =  fromJson ( readers );
366
367                                                 JSONObject writers = o.optJSONObject ( "writers" );
368                                                 if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
369                                                 fWriters = fromJson ( writers );
370                 }
371                 
372                 private NsaAcl fromJson(JSONObject o) {
373                         NsaAcl acl = new NsaAcl();
374                         if (o != null) {
375                                 JSONArray a = o.optJSONArray("allowed");
376                                 if (a != null) {
377                                         for (int i = 0; i < a.length(); ++i) {
378                                                 String user = a.getString(i);
379                                                 acl.add(user);
380                                         }
381                                 }
382                         }
383                         return acl;
384                 }
385
386                 @Override
387                 public String getName() {
388                         return fName;
389                 }
390
391                 @Override
392                 public String getOwner() {
393                         return fOwner;
394                 }
395
396                 @Override
397                 public String getDescription() {
398                         return fDesc;
399                 }
400
401                 @Override
402                 public NsaAcl getReaderAcl() {
403                         return fReaders;
404                 }
405
406                 @Override
407                 public NsaAcl getWriterAcl() {
408                         return fWriters;
409                 }
410
411                 @Override
412                 public void checkUserRead(NsaApiKey user) throws AccessDeniedException  {
413                         NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
414                 }
415
416                 @Override
417                 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException  {
418                         NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
419                 }
420
421                 @Override
422                 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
423                                 throws ConfigDbException, AccessDeniedException {
424                         updateAcl(asUser, false, true, pubId);
425                 }
426
427                 @Override
428                 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
429                         updateAcl(asUser, false, false, pubId);
430                 }
431
432                 @Override
433                 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
434                                 throws ConfigDbException, AccessDeniedException {
435                         updateAcl(asUser, true, true, consumerId);
436                 }
437
438                 @Override
439                 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
440                                 throws ConfigDbException, AccessDeniedException {
441                         updateAcl(asUser, true, false, consumerId);
442                 }
443
444                 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
445                                 throws ConfigDbException, AccessDeniedException{
446                         try
447                         {
448                                 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
449         
450                                 // we have to assume we have current data, or load it again. for the expected use
451                                 // case, assuming we can overwrite the data is fine.
452                                 final JSONObject o = new JSONObject ();
453                                 o.put ( "owner", fOwner );
454                                 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
455                                 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
456                                 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
457                                 
458                                 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
459         
460                         }
461                         catch ( ConfigDbException x )
462                         {
463                                 throw x;
464                         }
465                         catch ( AccessDeniedException x )
466                         {
467                                 throw x;
468                         }
469                         
470                 }
471
472                 private JSONObject safeSerialize(NsaAcl acl) {
473                         return acl == null ? null : acl.serialize();
474                 }
475
476                 private final String fName;
477                 private final ConfigDb fConfigDb;
478                 private final ConfigPath fBaseTopicData;
479                 private final String fOwner;
480                 private final String fDesc;
481                 private final NsaAcl fReaders;
482                 private final NsaAcl fWriters;
483                 private boolean fTransactionEnabled;
484         
485                 public boolean isTransactionEnabled() {
486                         return fTransactionEnabled;
487                 }
488
489                 @Override
490                 public Set<String> getOwners() {
491                         final TreeSet<String> owners = new TreeSet<String> ();
492                         owners.add ( fOwner );
493                         return owners;
494                 }
495         }
496
497 }