1e20ee2a6d271ed154df91fb7e670406fc84034b
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import java.util.Arrays;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Properties;
28 import java.util.Set;
29 import java.util.TreeSet;
30 import java.util.concurrent.ExecutionException;
31
32 import org.I0Itec.zkclient.ZkClient;
33 import org.I0Itec.zkclient.exception.ZkNoNodeException;
34 import org.apache.kafka.clients.admin.AdminClient;
35 import org.apache.kafka.clients.admin.AdminClientConfig;
36 import org.apache.kafka.clients.admin.CreateTopicsResult;
37 import org.apache.kafka.clients.admin.NewTopic;
38 import org.apache.kafka.common.KafkaFuture;
39 import org.json.JSONObject;
40 import org.json.JSONArray;
41 import org.springframework.beans.factory.annotation.Qualifier;
42
43 import org.onap.dmaap.dmf.mr.CambriaApiException;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
46 import org.onap.dmaap.dmf.mr.metabroker.Topic;
47 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
48 import org.onap.dmaap.dmf.mr.utils.Utils;
49 import com.att.eelf.configuration.EELFLogger;
50 import com.att.eelf.configuration.EELFManager;
51
52 import com.att.nsa.configs.ConfigDb;
53 import com.att.nsa.configs.ConfigDbException;
54 import com.att.nsa.configs.ConfigPath;
55 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
56 import com.att.nsa.drumlin.till.nv.rrNvReadable;
57 import com.att.nsa.security.NsaAcl;
58 import com.att.nsa.security.NsaAclUtils;
59 import com.att.nsa.security.NsaApiKey;
60
61
62 /**
63  * class performing all topic operations
64  * 
65  * @author anowarul.islam
66  *
67  */
68 //@Component
69 public class DMaaPKafkaMetaBroker implements Broker1 {
70
71         public DMaaPKafkaMetaBroker() {
72                 fZk = null;
73                 fCambriaConfig = null;
74                 fBaseTopicData = null;
75                 final Properties props = new Properties ();
76                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
77                                 "kafka.metadata.broker.list");
78                 if (null == fkafkaBrokers) {
79
80                         fkafkaBrokers = "localhost:9092";
81                 }
82                 
83              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
84              if(Utils.isCadiEnabled()){
85              props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
86                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
87              props.put("sasl.mechanism", "PLAIN");
88              }
89            
90              fKafkaAdminClient=AdminClient.create ( props );
91             
92         }
93
94         private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
95         private final AdminClient fKafkaAdminClient;
96         
97         
98
99         /**
100          * DMaaPKafkaMetaBroker constructor initializing
101          * 
102          * @param settings
103          * @param zk
104          * @param configDb
105          */
106         public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
107                         @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
108                 fZk = zk;
109                 fCambriaConfig = configDb;
110                 fBaseTopicData = configDb.parse("/topics");
111                 final Properties props = new Properties ();
112                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
113                                 "kafka.metadata.broker.list");
114                 if (null == fkafkaBrokers) {
115
116                         fkafkaBrokers = "localhost:9092";
117                 }
118                 
119                  if(Utils.isCadiEnabled()){
120                  props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
121                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
122              props.put("sasl.mechanism", "PLAIN");
123                  }
124              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
125              
126              fKafkaAdminClient=AdminClient.create ( props );
127             
128                 
129                 
130         }
131         
132         public DMaaPKafkaMetaBroker( rrNvReadable settings,
133                         ZkClient zk,  ConfigDb configDb,AdminClient client) {
134                 
135                 fZk = zk;
136                 fCambriaConfig = configDb;
137                 fBaseTopicData = configDb.parse("/topics");
138             fKafkaAdminClient= client;
139            
140                 
141                 
142         }
143
144         @Override
145         public List<Topic> getAllTopics() throws ConfigDbException {
146                 log.info("Retrieving list of all the topics.");
147                 final LinkedList<Topic> result = new LinkedList<>();
148                 try {
149                         log.info("Retrieving all topics from root: " + zkTopicsRoot);
150                         final List<String> topics = fZk.getChildren(zkTopicsRoot);
151                         for (String topic : topics) {
152                                 result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
153                         }
154                         JSONObject dataObj = new JSONObject();
155                         dataObj.put("topics", new JSONObject());
156
157                         for (String topic : topics) {
158                                 dataObj.getJSONObject("topics").put(topic, new JSONObject());
159                         }
160                 } catch (ZkNoNodeException excp) {
161                         // very fresh kafka doesn't have any topics or a topics node
162                         log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
163                 }
164                 return result;
165         }
166
167         @Override
168         public Topic getTopic(String topic) throws ConfigDbException {
169                 if (fZk.exists(zkTopicsRoot + "/" + topic)) {
170                         return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
171                 }
172                 // else: no such topic in kafka
173                 return null;
174         }
175
176         /**
177          * static method get KafkaTopic object
178          * 
179          * @param db
180          * @param base
181          * @param topic
182          * @return
183          * @throws ConfigDbException
184          */
185         public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
186                 return new KafkaTopic(topic, db, base);
187         }
188
189         /**
190          * creating topic
191          */
192         @Override
193         public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
194                         boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
195                 log.info("Creating topic: " + topic);
196                 try {
197                         log.info("Check if topic [" + topic + "] exist.");
198                         // first check for existence "our way"
199                         final Topic t = getTopic(topic);
200                         if (t != null) {
201                                 log.info("Could not create topic [" + topic + "]. Topic Already exists.");
202                                 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
203                         }
204                 } catch (ConfigDbException e1) {
205                         log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
206                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
207                                         "Couldn't check topic data in config db.");
208                 }
209
210                 // we only allow 3 replicas. (If we don't test this, we get weird
211                 // results from the cluster,
212                 // so explicit test and fail.)
213                 if (replicas < 1 || replicas > 3) {
214                         log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
215                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
216                                         "The replica count must be between 1 and 3.");
217                 }
218                 if (partitions < 1) {
219                         log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
220                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
221                 }
222
223                 // create via kafka
224                 
225                         try
226                         {
227                                 final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() );
228                                 final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
229                                 final KafkaFuture<Void> ctrResult = ctr.all ();
230                                 ctrResult.get ();
231                                 // underlying Kafka topic created. now setup our API info
232                                 return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
233                         }
234                         catch ( InterruptedException e )
235                         {
236                                 
237                                 log.warn ( "Execution of describeTopics timed out." );
238                                 throw new ConfigDbException ( e );
239                         }
240                         catch ( ExecutionException e )
241                         {
242                                 
243                                 log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
244                                 throw new ConfigDbException ( e.getCause () );
245                         }
246                 
247         }
248
249         @Override
250         public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
251                 log.info("Deleting topic: " + topic);
252                 try {
253                         log.info("Loading zookeeper client for topic deletion.");
254                                         // topic creation. (Otherwise, the topic is only partially created
255                         // in ZK.)
256                         
257                         
258                         fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
259                         log.info("Zookeeper client loaded successfully. Deleting topic.");
260                         
261                 } catch (Exception e) {
262                         log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
263                         throw new ConfigDbException(e);
264                 }  finally {
265                         log.info("Closing zookeeper connection.");
266                 }
267         }
268
269         private final ZkClient fZk;
270         private final ConfigDb fCambriaConfig;
271         private final ConfigPath fBaseTopicData;
272
273         private static final String zkTopicsRoot = "/brokers/topics";
274         private static final JSONObject kEmptyAcl = new JSONObject();
275
276         /**
277          * method Providing KafkaTopic Object associated with owner and
278          * transactionenabled or not
279          * 
280          * @param name
281          * @param desc
282          * @param owner
283          * @param transactionEnabled
284          * @return
285          * @throws ConfigDbException
286          */
287         public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
288                         throws ConfigDbException {
289                 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
290         }
291
292         /**
293          * static method giving kafka topic object
294          * 
295          * @param db
296          * @param basePath
297          * @param name
298          * @param desc
299          * @param owner
300          * @param transactionEnabled
301          * @return
302          * @throws ConfigDbException
303          */
304         public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
305                         boolean transactionEnabled) throws ConfigDbException {
306                 final JSONObject o = new JSONObject();
307                 o.put("owner", owner);
308                 o.put("description", desc);
309                 o.put("txenabled", transactionEnabled);
310                 db.store(basePath.getChild(name), o.toString());
311                 return new KafkaTopic(name, db, basePath);
312         }
313
314         /**
315          * class performing all user opearation like user is eligible to read,
316          * write. permitting a user to write and read,
317          * 
318          * @author anowarul.islam
319          *
320          */
321         public static class KafkaTopic implements Topic {
322                 /**
323                  * constructor initializes
324                  * 
325                  * @param name
326                  * @param configdb
327                  * @param baseTopic
328                  * @throws ConfigDbException
329                  */
330                 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
331                         fName = name;
332                         fConfigDb = configdb;
333                         fBaseTopicData = baseTopic;
334
335                         String data = fConfigDb.load(fBaseTopicData.getChild(fName));
336                         if (data == null) {
337                                 data = "{}";
338                         }
339
340                         final JSONObject o = new JSONObject(data);
341                         fOwner = o.optString("owner", "");
342                         fDesc = o.optString("description", "");
343                         fTransactionEnabled = o.optBoolean("txenabled", false);// default
344                                                                                                                                         // value is
345                                                                                                                                         // false
346                         // if this topic has an owner, it needs both read/write ACLs. If there's no
347                                                 // owner (or it's empty), null is okay -- this is for existing or implicitly
348                                                 // created topics.
349                                                 JSONObject readers = o.optJSONObject ( "readers" );
350                                                 if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
351                                                 fReaders =  fromJson ( readers );
352
353                                                 JSONObject writers = o.optJSONObject ( "writers" );
354                                                 if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
355                                                 fWriters = fromJson ( writers );
356                 }
357                 
358                 private NsaAcl fromJson(JSONObject o) {
359                         NsaAcl acl = new NsaAcl();
360                         if (o != null) {
361                                 JSONArray a = o.optJSONArray("allowed");
362                                 if (a != null) {
363                                         for (int i = 0; i < a.length(); ++i) {
364                                                 String user = a.getString(i);
365                                                 acl.add(user);
366                                         }
367                                 }
368                         }
369                         return acl;
370                 }
371
372                 @Override
373                 public String getName() {
374                         return fName;
375                 }
376
377                 @Override
378                 public String getOwner() {
379                         return fOwner;
380                 }
381
382                 @Override
383                 public String getDescription() {
384                         return fDesc;
385                 }
386
387                 @Override
388                 public NsaAcl getReaderAcl() {
389                         return fReaders;
390                 }
391
392                 @Override
393                 public NsaAcl getWriterAcl() {
394                         return fWriters;
395                 }
396
397                 @Override
398                 public void checkUserRead(NsaApiKey user) throws AccessDeniedException  {
399                         NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
400                 }
401
402                 @Override
403                 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException  {
404                         NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
405                 }
406
407                 @Override
408                 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
409                                 throws ConfigDbException, AccessDeniedException {
410                         updateAcl(asUser, false, true, pubId);
411                 }
412
413                 @Override
414                 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
415                         updateAcl(asUser, false, false, pubId);
416                 }
417
418                 @Override
419                 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
420                                 throws ConfigDbException, AccessDeniedException {
421                         updateAcl(asUser, true, true, consumerId);
422                 }
423
424                 @Override
425                 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
426                                 throws ConfigDbException, AccessDeniedException {
427                         updateAcl(asUser, true, false, consumerId);
428                 }
429
430                 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
431                                 throws ConfigDbException, AccessDeniedException{
432                         try
433                         {
434                                 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
435         
436                                 // we have to assume we have current data, or load it again. for the expected use
437                                 // case, assuming we can overwrite the data is fine.
438                                 final JSONObject o = new JSONObject ();
439                                 o.put ( "owner", fOwner );
440                                 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
441                                 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
442                                 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
443                                 
444                                 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
445         
446                         }
447                         catch ( ConfigDbException x )
448                         {
449                                 throw x;
450                         }
451                         catch ( AccessDeniedException x )
452                         {
453                                 throw x;
454                         }
455                         
456                 }
457
458                 private JSONObject safeSerialize(NsaAcl acl) {
459                         return acl == null ? null : acl.serialize();
460                 }
461
462                 private final String fName;
463                 private final ConfigDb fConfigDb;
464                 private final ConfigPath fBaseTopicData;
465                 private final String fOwner;
466                 private final String fDesc;
467                 private final NsaAcl fReaders;
468                 private final NsaAcl fWriters;
469                 private boolean fTransactionEnabled;
470         
471                 public boolean isTransactionEnabled() {
472                         return fTransactionEnabled;
473                 }
474
475                 @Override
476                 public Set<String> getOwners() {
477                         final TreeSet<String> owners = new TreeSet<>();
478                         owners.add ( fOwner );
479                         return owners;
480                 }
481         }
482
483 }