[MR] Add support for configuring jaas.sasl.config at runtime
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import com.att.eelf.configuration.EELFLogger;
25 import com.att.eelf.configuration.EELFManager;
26 import com.att.nsa.configs.ConfigDb;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.configs.ConfigPath;
29 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
30 import com.att.nsa.drumlin.till.nv.rrNvReadable;
31 import com.att.nsa.security.NsaAcl;
32 import com.att.nsa.security.NsaAclUtils;
33 import com.att.nsa.security.NsaApiKey;
34 import org.I0Itec.zkclient.ZkClient;
35 import org.I0Itec.zkclient.exception.ZkNoNodeException;
36 import org.apache.kafka.clients.admin.AdminClient;
37 import org.apache.kafka.clients.admin.AdminClientConfig;
38 import org.apache.kafka.clients.admin.CreateTopicsResult;
39 import org.apache.kafka.clients.admin.NewTopic;
40 import org.apache.kafka.common.KafkaFuture;
41 import org.json.JSONArray;
42 import org.json.JSONObject;
43 import org.onap.dmaap.dmf.mr.CambriaApiException;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
46 import org.onap.dmaap.dmf.mr.metabroker.Topic;
47 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
48 import org.onap.dmaap.dmf.mr.utils.Utils;
49 import org.springframework.beans.factory.annotation.Qualifier;
50 import org.springframework.util.StringUtils;
51
52 import java.util.*;
53 import java.util.concurrent.ExecutionException;
54
55
56 /**
57  * class performing all topic operations
58  *
59  * @author anowarul.islam
60  *
61  */
62 //@Component
63 public class DMaaPKafkaMetaBroker implements Broker1 {
64
65         public DMaaPKafkaMetaBroker() {
66                 fZk = null;
67                 fCambriaConfig = null;
68                 fBaseTopicData = null;
69                 final Properties props = new Properties ();
70                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
71                                 "kafka.metadata.broker.list");
72                 if (StringUtils.isEmpty(fkafkaBrokers)) {
73
74                         fkafkaBrokers = "localhost:9092";
75                 }
76
77                 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
78                 if(Utils.isCadiEnabled()){
79                         props.putAll(Utils.addSaslProps());
80                 }
81                 fKafkaAdminClient=AdminClient.create ( props );
82
83         }
84
85         private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
86         private final AdminClient fKafkaAdminClient;
87
88
89
90         /**
91          * DMaaPKafkaMetaBroker constructor initializing
92          *
93          * @param settings
94          * @param zk
95          * @param configDb
96          */
97         public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
98                                                                 @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
99                 fZk = zk;
100                 fCambriaConfig = configDb;
101                 fBaseTopicData = configDb.parse("/topics");
102                 final Properties props = new Properties ();
103                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
104                                 "kafka.metadata.broker.list");
105                 if (null == fkafkaBrokers) {
106
107                         fkafkaBrokers = "localhost:9092";
108                 }
109
110                 if(Utils.isCadiEnabled()){
111                         props.putAll(Utils.addSaslProps());
112                 }
113                 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
114
115                 fKafkaAdminClient=AdminClient.create ( props );
116
117
118
119         }
120
121         public DMaaPKafkaMetaBroker( rrNvReadable settings,
122                                                                  ZkClient zk,  ConfigDb configDb,AdminClient client) {
123
124                 fZk = zk;
125                 fCambriaConfig = configDb;
126                 fBaseTopicData = configDb.parse("/topics");
127                 fKafkaAdminClient= client;
128
129
130
131         }
132
133         @Override
134         public List<Topic> getAllTopics() throws ConfigDbException {
135                 log.info("Retrieving list of all the topics.");
136                 final LinkedList<Topic> result = new LinkedList<>();
137                 try {
138                         log.info("Retrieving all topics from root: " + zkTopicsRoot);
139                         final List<String> topics = fZk.getChildren(zkTopicsRoot);
140                         for (String topic : topics) {
141                                 result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
142                         }
143                         JSONObject dataObj = new JSONObject();
144                         dataObj.put("topics", new JSONObject());
145
146                         for (String topic : topics) {
147                                 dataObj.getJSONObject("topics").put(topic, new JSONObject());
148                         }
149                 } catch (ZkNoNodeException excp) {
150                         // very fresh kafka doesn't have any topics or a topics node
151                         log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
152                 }
153                 return result;
154         }
155
156         @Override
157         public Topic getTopic(String topic) throws ConfigDbException {
158                 if (fZk.exists(zkTopicsRoot + "/" + topic)) {
159                         return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
160                 }
161                 // else: no such topic in kafka
162                 return null;
163         }
164
165         /**
166          * static method get KafkaTopic object
167          *
168          * @param db
169          * @param base
170          * @param topic
171          * @return
172          * @throws ConfigDbException
173          */
174         public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
175                 return new KafkaTopic(topic, db, base);
176         }
177
178         /**
179          * creating topic
180          */
181         @Override
182         public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
183                                                          boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
184                 log.info("Creating topic: " + topic);
185                 try {
186                         log.info("Check if topic [" + topic + "] exist.");
187                         // first check for existence "our way"
188                         final Topic t = getTopic(topic);
189                         if (t != null) {
190                                 log.info("Could not create topic [" + topic + "]. Topic Already exists.");
191                                 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
192                         }
193                 } catch (ConfigDbException e1) {
194                         log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
195                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
196                                         "Couldn't check topic data in config db.");
197                 }
198
199                 // we only allow 3 replicas. (If we don't test this, we get weird
200                 // results from the cluster,
201                 // so explicit test and fail.)
202                 if (replicas < 1 || replicas > 3) {
203                         log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
204                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
205                                         "The replica count must be between 1 and 3.");
206                 }
207                 if (partitions < 1) {
208                         log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
209                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
210                 }
211
212                 // create via kafka
213
214                 try {
215                         final NewTopic topicRequest =
216                                         new NewTopic(topic, partitions, (short)replicas);
217                         final CreateTopicsResult ctr =
218                                         fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
219                         final KafkaFuture<Void> ctrResult = ctr.all();
220                         ctrResult.get();
221                         // underlying Kafka topic created. now setup our API info
222                         return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
223                 } catch (InterruptedException e) {
224                         log.warn("Execution of describeTopics timed out.");
225                         throw new ConfigDbException(e);
226                 } catch (ExecutionException e) {
227                         log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
228                         throw new ConfigDbException(e.getCause());
229                 }
230
231         }
232
233         @Override
234         public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
235                 log.info("Deleting topic: " + topic);
236                 try {
237                         log.info("Loading zookeeper client for topic deletion.");
238                         // topic creation. (Otherwise, the topic is only partially created
239                         // in ZK.)
240
241
242                         fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
243                         log.info("Zookeeper client loaded successfully. Deleting topic.");
244
245                 } catch (Exception e) {
246                         log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
247                         throw new ConfigDbException(e);
248                 }  finally {
249                         log.info("Closing zookeeper connection.");
250                 }
251         }
252
253         private final ZkClient fZk;
254         private final ConfigDb fCambriaConfig;
255         private final ConfigPath fBaseTopicData;
256
257         private static final String zkTopicsRoot = "/brokers/topics";
258         private static final JSONObject kEmptyAcl = new JSONObject();
259
260         /**
261          * method Providing KafkaTopic Object associated with owner and
262          * transactionenabled or not
263          *
264          * @param name
265          * @param desc
266          * @param owner
267          * @param transactionEnabled
268          * @return
269          * @throws ConfigDbException
270          */
271         public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
272                         throws ConfigDbException {
273                 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
274         }
275
276         /**
277          * static method giving kafka topic object
278          *
279          * @param db
280          * @param basePath
281          * @param name
282          * @param desc
283          * @param owner
284          * @param transactionEnabled
285          * @return
286          * @throws ConfigDbException
287          */
288         public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
289                                                                                           boolean transactionEnabled) throws ConfigDbException {
290                 final JSONObject o = new JSONObject();
291                 o.put("owner", owner);
292                 o.put("description", desc);
293                 o.put("txenabled", transactionEnabled);
294                 db.store(basePath.getChild(name), o.toString());
295                 return new KafkaTopic(name, db, basePath);
296         }
297
298         /**
299          * class performing all user opearation like user is eligible to read,
300          * write. permitting a user to write and read,
301          *
302          * @author anowarul.islam
303          *
304          */
305         public static class KafkaTopic implements Topic {
306                 /**
307                  * constructor initializes
308                  *
309                  * @param name
310                  * @param configdb
311                  * @param baseTopic
312                  * @throws ConfigDbException
313                  */
314                 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
315                         fName = name;
316                         fConfigDb = configdb;
317                         fBaseTopicData = baseTopic;
318
319                         String data = fConfigDb.load(fBaseTopicData.getChild(fName));
320                         if (data == null) {
321                                 data = "{}";
322                         }
323
324                         final JSONObject o = new JSONObject(data);
325                         fOwner = o.optString("owner", "");
326                         fDesc = o.optString("description", "");
327                         fTransactionEnabled = o.optBoolean("txenabled", false);// default
328                         // value is
329                         // false
330                         // if this topic has an owner, it needs both read/write ACLs. If there's no
331                         // owner (or it's empty), null is okay -- this is for existing or implicitly
332                         // created topics.
333                         JSONObject readers = o.optJSONObject ( "readers" );
334                         if ( readers == null && fOwner.length () > 0 )
335                         {
336                                 readers = kEmptyAcl;
337                         }
338                         fReaders =  fromJson ( readers );
339
340                         JSONObject writers = o.optJSONObject ( "writers" );
341                         if ( writers == null && fOwner.length () > 0 )
342                         {
343                                 writers = kEmptyAcl;
344                         }
345                         fWriters = fromJson ( writers );
346                 }
347
348                 private NsaAcl fromJson(JSONObject o) {
349                         NsaAcl acl = new NsaAcl();
350                         if (o != null) {
351                                 JSONArray a = o.optJSONArray("allowed");
352                                 if (a != null) {
353                                         for (int i = 0; i < a.length(); ++i) {
354                                                 String user = a.getString(i);
355                                                 acl.add(user);
356                                         }
357                                 }
358                         }
359                         return acl;
360                 }
361
362                 @Override
363                 public String getName() {
364                         return fName;
365                 }
366
367                 @Override
368                 public String getOwner() {
369                         return fOwner;
370                 }
371
372                 @Override
373                 public String getDescription() {
374                         return fDesc;
375                 }
376
377                 @Override
378                 public NsaAcl getReaderAcl() {
379                         return fReaders;
380                 }
381
382                 @Override
383                 public NsaAcl getWriterAcl() {
384                         return fWriters;
385                 }
386
387                 @Override
388                 public void checkUserRead(NsaApiKey user) throws AccessDeniedException  {
389                         NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
390                 }
391
392                 @Override
393                 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException  {
394                         NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
395                 }
396
397                 @Override
398                 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
399                                 throws ConfigDbException, AccessDeniedException {
400                         updateAcl(asUser, false, true, pubId);
401                 }
402
403                 @Override
404                 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
405                         updateAcl(asUser, false, false, pubId);
406                 }
407
408                 @Override
409                 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
410                                 throws ConfigDbException, AccessDeniedException {
411                         updateAcl(asUser, true, true, consumerId);
412                 }
413
414                 @Override
415                 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
416                                 throws ConfigDbException, AccessDeniedException {
417                         updateAcl(asUser, true, false, consumerId);
418                 }
419
420                 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
421                                 throws ConfigDbException, AccessDeniedException{
422                         try
423                         {
424                                 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
425
426                                 // we have to assume we have current data, or load it again. for the expected use
427                                 // case, assuming we can overwrite the data is fine.
428                                 final JSONObject o = new JSONObject ();
429                                 o.put ( "owner", fOwner );
430                                 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
431                                 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
432                                 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
433
434                                 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
435
436                         }
437                         catch ( ConfigDbException | AccessDeniedException x )
438                         {
439                                 throw x;
440                         }
441
442                 }
443
444                 private JSONObject safeSerialize(NsaAcl acl) {
445                         return acl == null ? null : acl.serialize();
446                 }
447
448                 private final String fName;
449                 private final ConfigDb fConfigDb;
450                 private final ConfigPath fBaseTopicData;
451                 private final String fOwner;
452                 private final String fDesc;
453                 private final NsaAcl fReaders;
454                 private final NsaAcl fWriters;
455                 private boolean fTransactionEnabled;
456
457                 public boolean isTransactionEnabled() {
458                         return fTransactionEnabled;
459                 }
460
461                 @Override
462                 public Set<String> getOwners() {
463                         final TreeSet<String> owners = new TreeSet<>();
464                         owners.add ( fOwner );
465                         return owners;
466                 }
467         }
468
469 }