Fix sonar issues in messagerouter/msgrtr
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import java.util.Arrays;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Properties;
28 import java.util.Set;
29 import java.util.TreeSet;
30 import java.util.concurrent.ExecutionException;
31
32 import org.I0Itec.zkclient.ZkClient;
33 import org.I0Itec.zkclient.exception.ZkNoNodeException;
34 import org.apache.kafka.clients.admin.AdminClient;
35 import org.apache.kafka.clients.admin.AdminClientConfig;
36 import org.apache.kafka.clients.admin.CreateTopicsResult;
37 import org.apache.kafka.clients.admin.NewTopic;
38 import org.apache.kafka.common.KafkaFuture;
39 import org.json.JSONObject;
40 import org.json.JSONArray;
41 import org.springframework.beans.factory.annotation.Qualifier;
42 import org.springframework.util.StringUtils;
43 import org.onap.dmaap.dmf.mr.CambriaApiException;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
46 import org.onap.dmaap.dmf.mr.metabroker.Topic;
47 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
48 import org.onap.dmaap.dmf.mr.utils.Utils;
49 import com.att.eelf.configuration.EELFLogger;
50 import com.att.eelf.configuration.EELFManager;
51
52 import com.att.nsa.configs.ConfigDb;
53 import com.att.nsa.configs.ConfigDbException;
54 import com.att.nsa.configs.ConfigPath;
55 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
56 import com.att.nsa.drumlin.till.data.stringUtils;
57 import com.att.nsa.drumlin.till.nv.rrNvReadable;
58 import com.att.nsa.security.NsaAcl;
59 import com.att.nsa.security.NsaAclUtils;
60 import com.att.nsa.security.NsaApiKey;
61
62
63 /**
64  * class performing all topic operations
65  * 
66  * @author anowarul.islam
67  *
68  */
69 //@Component
70 public class DMaaPKafkaMetaBroker implements Broker1 {
71
72         public DMaaPKafkaMetaBroker() {
73                 fZk = null;
74                 fCambriaConfig = null;
75                 fBaseTopicData = null;
76                 final Properties props = new Properties ();
77                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
78                                 "kafka.metadata.broker.list");
79                 if (StringUtils.isEmpty(fkafkaBrokers)) {
80
81                         fkafkaBrokers = "localhost:9092";
82                 }
83                 
84              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
85              if(Utils.isCadiEnabled()){
86              props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
87                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
88              props.put("sasl.mechanism", "PLAIN");
89              }
90            
91              fKafkaAdminClient=AdminClient.create ( props );
92             
93         }
94
95         private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
96         private final AdminClient fKafkaAdminClient;
97         
98         
99
100         /**
101          * DMaaPKafkaMetaBroker constructor initializing
102          * 
103          * @param settings
104          * @param zk
105          * @param configDb
106          */
107         public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
108                         @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
109                 fZk = zk;
110                 fCambriaConfig = configDb;
111                 fBaseTopicData = configDb.parse("/topics");
112                 final Properties props = new Properties ();
113                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
114                                 "kafka.metadata.broker.list");
115                 if (null == fkafkaBrokers) {
116
117                         fkafkaBrokers = "localhost:9092";
118                 }
119                 
120                  if(Utils.isCadiEnabled()){
121                  props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
122                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
123              props.put("sasl.mechanism", "PLAIN");
124                  }
125              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
126              
127              fKafkaAdminClient=AdminClient.create ( props );
128             
129                 
130                 
131         }
132         
133         public DMaaPKafkaMetaBroker( rrNvReadable settings,
134                         ZkClient zk,  ConfigDb configDb,AdminClient client) {
135                 
136                 fZk = zk;
137                 fCambriaConfig = configDb;
138                 fBaseTopicData = configDb.parse("/topics");
139             fKafkaAdminClient= client;
140            
141                 
142                 
143         }
144
145         @Override
146         public List<Topic> getAllTopics() throws ConfigDbException {
147                 log.info("Retrieving list of all the topics.");
148                 final LinkedList<Topic> result = new LinkedList<>();
149                 try {
150                         log.info("Retrieving all topics from root: " + zkTopicsRoot);
151                         final List<String> topics = fZk.getChildren(zkTopicsRoot);
152                         for (String topic : topics) {
153                                 result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
154                         }
155                         JSONObject dataObj = new JSONObject();
156                         dataObj.put("topics", new JSONObject());
157
158                         for (String topic : topics) {
159                                 dataObj.getJSONObject("topics").put(topic, new JSONObject());
160                         }
161                 } catch (ZkNoNodeException excp) {
162                         // very fresh kafka doesn't have any topics or a topics node
163                         log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
164                 }
165                 return result;
166         }
167
168         @Override
169         public Topic getTopic(String topic) throws ConfigDbException {
170                 if (fZk.exists(zkTopicsRoot + "/" + topic)) {
171                         return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
172                 }
173                 // else: no such topic in kafka
174                 return null;
175         }
176
177         /**
178          * static method get KafkaTopic object
179          * 
180          * @param db
181          * @param base
182          * @param topic
183          * @return
184          * @throws ConfigDbException
185          */
186         public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
187                 return new KafkaTopic(topic, db, base);
188         }
189
190         /**
191          * creating topic
192          */
193         @Override
194         public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
195                         boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
196                 log.info("Creating topic: " + topic);
197                 try {
198                         log.info("Check if topic [" + topic + "] exist.");
199                         // first check for existence "our way"
200                         final Topic t = getTopic(topic);
201                         if (t != null) {
202                                 log.info("Could not create topic [" + topic + "]. Topic Already exists.");
203                                 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
204                         }
205                 } catch (ConfigDbException e1) {
206                         log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
207                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
208                                         "Couldn't check topic data in config db.");
209                 }
210
211                 // we only allow 3 replicas. (If we don't test this, we get weird
212                 // results from the cluster,
213                 // so explicit test and fail.)
214                 if (replicas < 1 || replicas > 3) {
215                         log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
216                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
217                                         "The replica count must be between 1 and 3.");
218                 }
219                 if (partitions < 1) {
220                         log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
221                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
222                 }
223
224                 // create via kafka
225
226         try {
227             final NewTopic topicRequest =
228                     new NewTopic(topic, partitions, (short)replicas);
229             final CreateTopicsResult ctr =
230                     fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
231             final KafkaFuture<Void> ctrResult = ctr.all();
232             ctrResult.get();
233             // underlying Kafka topic created. now setup our API info
234             return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
235         } catch (InterruptedException e) {
236             log.warn("Execution of describeTopics timed out.");
237             throw new ConfigDbException(e);
238         } catch (ExecutionException e) {
239             log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
240             throw new ConfigDbException(e.getCause());
241         }
242                 
243         }
244
245         @Override
246         public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
247                 log.info("Deleting topic: " + topic);
248                 try {
249                         log.info("Loading zookeeper client for topic deletion.");
250                                         // topic creation. (Otherwise, the topic is only partially created
251                         // in ZK.)
252                         
253                         
254                         fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
255                         log.info("Zookeeper client loaded successfully. Deleting topic.");
256                         
257                 } catch (Exception e) {
258                         log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
259                         throw new ConfigDbException(e);
260                 }  finally {
261                         log.info("Closing zookeeper connection.");
262                 }
263         }
264
265         private final ZkClient fZk;
266         private final ConfigDb fCambriaConfig;
267         private final ConfigPath fBaseTopicData;
268
269         private static final String zkTopicsRoot = "/brokers/topics";
270         private static final JSONObject kEmptyAcl = new JSONObject();
271
272         /**
273          * method Providing KafkaTopic Object associated with owner and
274          * transactionenabled or not
275          * 
276          * @param name
277          * @param desc
278          * @param owner
279          * @param transactionEnabled
280          * @return
281          * @throws ConfigDbException
282          */
283         public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
284                         throws ConfigDbException {
285                 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
286         }
287
288         /**
289          * static method giving kafka topic object
290          * 
291          * @param db
292          * @param basePath
293          * @param name
294          * @param desc
295          * @param owner
296          * @param transactionEnabled
297          * @return
298          * @throws ConfigDbException
299          */
300         public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
301                         boolean transactionEnabled) throws ConfigDbException {
302                 final JSONObject o = new JSONObject();
303                 o.put("owner", owner);
304                 o.put("description", desc);
305                 o.put("txenabled", transactionEnabled);
306                 db.store(basePath.getChild(name), o.toString());
307                 return new KafkaTopic(name, db, basePath);
308         }
309
310         /**
311          * class performing all user opearation like user is eligible to read,
312          * write. permitting a user to write and read,
313          * 
314          * @author anowarul.islam
315          *
316          */
317         public static class KafkaTopic implements Topic {
318                 /**
319                  * constructor initializes
320                  * 
321                  * @param name
322                  * @param configdb
323                  * @param baseTopic
324                  * @throws ConfigDbException
325                  */
326                 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
327                         fName = name;
328                         fConfigDb = configdb;
329                         fBaseTopicData = baseTopic;
330
331                         String data = fConfigDb.load(fBaseTopicData.getChild(fName));
332                         if (data == null) {
333                                 data = "{}";
334                         }
335
336                         final JSONObject o = new JSONObject(data);
337                         fOwner = o.optString("owner", "");
338                         fDesc = o.optString("description", "");
339                         fTransactionEnabled = o.optBoolean("txenabled", false);// default
340                                                                                                                                         // value is
341                                                                                                                                         // false
342                         // if this topic has an owner, it needs both read/write ACLs. If there's no
343                                                 // owner (or it's empty), null is okay -- this is for existing or implicitly
344                                                 // created topics.
345                                                 JSONObject readers = o.optJSONObject ( "readers" );
346                                                 if ( readers == null && fOwner.length () > 0 )
347                                                 {
348                                                     readers = kEmptyAcl;
349                                                 }
350                                                 fReaders =  fromJson ( readers );
351
352                                                 JSONObject writers = o.optJSONObject ( "writers" );
353                                                 if ( writers == null && fOwner.length () > 0 )
354                                                 {
355                                                     writers = kEmptyAcl;
356                                                 }
357                                                 fWriters = fromJson ( writers );
358                 }
359                 
360                 private NsaAcl fromJson(JSONObject o) {
361                         NsaAcl acl = new NsaAcl();
362                         if (o != null) {
363                                 JSONArray a = o.optJSONArray("allowed");
364                                 if (a != null) {
365                                         for (int i = 0; i < a.length(); ++i) {
366                                                 String user = a.getString(i);
367                                                 acl.add(user);
368                                         }
369                                 }
370                         }
371                         return acl;
372                 }
373
374                 @Override
375                 public String getName() {
376                         return fName;
377                 }
378
379                 @Override
380                 public String getOwner() {
381                         return fOwner;
382                 }
383
384                 @Override
385                 public String getDescription() {
386                         return fDesc;
387                 }
388
389                 @Override
390                 public NsaAcl getReaderAcl() {
391                         return fReaders;
392                 }
393
394                 @Override
395                 public NsaAcl getWriterAcl() {
396                         return fWriters;
397                 }
398
399                 @Override
400                 public void checkUserRead(NsaApiKey user) throws AccessDeniedException  {
401                         NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
402                 }
403
404                 @Override
405                 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException  {
406                         NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
407                 }
408
409                 @Override
410                 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
411                                 throws ConfigDbException, AccessDeniedException {
412                         updateAcl(asUser, false, true, pubId);
413                 }
414
415                 @Override
416                 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
417                         updateAcl(asUser, false, false, pubId);
418                 }
419
420                 @Override
421                 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
422                                 throws ConfigDbException, AccessDeniedException {
423                         updateAcl(asUser, true, true, consumerId);
424                 }
425
426                 @Override
427                 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
428                                 throws ConfigDbException, AccessDeniedException {
429                         updateAcl(asUser, true, false, consumerId);
430                 }
431
432                 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
433                                 throws ConfigDbException, AccessDeniedException{
434                         try
435                         {
436                                 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
437         
438                                 // we have to assume we have current data, or load it again. for the expected use
439                                 // case, assuming we can overwrite the data is fine.
440                                 final JSONObject o = new JSONObject ();
441                                 o.put ( "owner", fOwner );
442                                 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
443                                 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
444                                 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
445                                 
446                                 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
447         
448                         }
449                         catch ( ConfigDbException | AccessDeniedException x )
450                         {
451                                 throw x;
452                         }
453                         
454                 }
455
456                 private JSONObject safeSerialize(NsaAcl acl) {
457                         return acl == null ? null : acl.serialize();
458                 }
459
460                 private final String fName;
461                 private final ConfigDb fConfigDb;
462                 private final ConfigPath fBaseTopicData;
463                 private final String fOwner;
464                 private final String fDesc;
465                 private final NsaAcl fReaders;
466                 private final NsaAcl fWriters;
467                 private boolean fTransactionEnabled;
468         
469                 public boolean isTransactionEnabled() {
470                         return fTransactionEnabled;
471                 }
472
473                 @Override
474                 public Set<String> getOwners() {
475                         final TreeSet<String> owners = new TreeSet<>();
476                         owners.add ( fOwner );
477                         return owners;
478                 }
479         }
480
481 }