DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import com.att.eelf.configuration.EELFLogger;
25 import com.att.eelf.configuration.EELFManager;
26 import com.att.nsa.configs.ConfigDb;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.configs.ConfigPath;
29 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
30 import com.att.nsa.drumlin.till.nv.rrNvReadable;
31 import com.att.nsa.security.NsaAcl;
32 import com.att.nsa.security.NsaAclUtils;
33 import com.att.nsa.security.NsaApiKey;
34 import org.I0Itec.zkclient.ZkClient;
35 import org.I0Itec.zkclient.exception.ZkNoNodeException;
36 import org.apache.kafka.clients.admin.AdminClient;
37 import org.apache.kafka.clients.admin.AdminClientConfig;
38 import org.apache.kafka.clients.admin.CreateTopicsResult;
39 import org.apache.kafka.clients.admin.NewTopic;
40 import org.apache.kafka.common.KafkaFuture;
41 import org.json.JSONArray;
42 import org.json.JSONObject;
43 import org.onap.dmaap.dmf.mr.CambriaApiException;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
46 import org.onap.dmaap.dmf.mr.metabroker.Topic;
47 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
48 import org.onap.dmaap.dmf.mr.utils.Utils;
49 import org.springframework.beans.factory.annotation.Qualifier;
50 import org.springframework.util.StringUtils;
51
52 import java.util.*;
53 import java.util.concurrent.ExecutionException;
54
55
56 /**
57  * class performing all topic operations
58  * 
59  * @author anowarul.islam
60  *
61  */
62 //@Component
63 public class DMaaPKafkaMetaBroker implements Broker1 {
64
65         public DMaaPKafkaMetaBroker() {
66                 fZk = null;
67                 fCambriaConfig = null;
68                 fBaseTopicData = null;
69                 final Properties props = new Properties ();
70                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
71                                 "kafka.metadata.broker.list");
72                 if (StringUtils.isEmpty(fkafkaBrokers)) {
73
74                         fkafkaBrokers = "localhost:9092";
75                 }
76                 
77              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
78              if(Utils.isCadiEnabled()){
79              props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
80                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
81              props.put("sasl.mechanism", "PLAIN");
82              }
83            
84              fKafkaAdminClient=AdminClient.create ( props );
85             
86         }
87
88         private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
89         private final AdminClient fKafkaAdminClient;
90         
91         
92
93         /**
94          * DMaaPKafkaMetaBroker constructor initializing
95          * 
96          * @param settings
97          * @param zk
98          * @param configDb
99          */
100         public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
101                         @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
102                 fZk = zk;
103                 fCambriaConfig = configDb;
104                 fBaseTopicData = configDb.parse("/topics");
105                 final Properties props = new Properties ();
106                 String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
107                                 "kafka.metadata.broker.list");
108                 if (null == fkafkaBrokers) {
109
110                         fkafkaBrokers = "localhost:9092";
111                 }
112                 
113                  if(Utils.isCadiEnabled()){
114                  props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
115                  props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
116              props.put("sasl.mechanism", "PLAIN");
117                  }
118              props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
119              
120              fKafkaAdminClient=AdminClient.create ( props );
121             
122                 
123                 
124         }
125         
126         public DMaaPKafkaMetaBroker( rrNvReadable settings,
127                         ZkClient zk,  ConfigDb configDb,AdminClient client) {
128                 
129                 fZk = zk;
130                 fCambriaConfig = configDb;
131                 fBaseTopicData = configDb.parse("/topics");
132             fKafkaAdminClient= client;
133            
134                 
135                 
136         }
137
138         @Override
139         public List<Topic> getAllTopics() throws ConfigDbException {
140                 log.info("Retrieving list of all the topics.");
141                 final LinkedList<Topic> result = new LinkedList<>();
142                 try {
143                         log.info("Retrieving all topics from root: " + zkTopicsRoot);
144                         final List<String> topics = fZk.getChildren(zkTopicsRoot);
145                         for (String topic : topics) {
146                                 result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
147                         }
148                         JSONObject dataObj = new JSONObject();
149                         dataObj.put("topics", new JSONObject());
150
151                         for (String topic : topics) {
152                                 dataObj.getJSONObject("topics").put(topic, new JSONObject());
153                         }
154                 } catch (ZkNoNodeException excp) {
155                         // very fresh kafka doesn't have any topics or a topics node
156                         log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
157                 }
158                 return result;
159         }
160
161         @Override
162         public Topic getTopic(String topic) throws ConfigDbException {
163                 if (fZk.exists(zkTopicsRoot + "/" + topic)) {
164                         return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
165                 }
166                 // else: no such topic in kafka
167                 return null;
168         }
169
170         /**
171          * static method get KafkaTopic object
172          * 
173          * @param db
174          * @param base
175          * @param topic
176          * @return
177          * @throws ConfigDbException
178          */
179         public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
180                 return new KafkaTopic(topic, db, base);
181         }
182
183         /**
184          * creating topic
185          */
186         @Override
187         public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
188                         boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
189                 log.info("Creating topic: " + topic);
190                 try {
191                         log.info("Check if topic [" + topic + "] exist.");
192                         // first check for existence "our way"
193                         final Topic t = getTopic(topic);
194                         if (t != null) {
195                                 log.info("Could not create topic [" + topic + "]. Topic Already exists.");
196                                 throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
197                         }
198                 } catch (ConfigDbException e1) {
199                         log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
200                         throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
201                                         "Couldn't check topic data in config db.");
202                 }
203
204                 // we only allow 3 replicas. (If we don't test this, we get weird
205                 // results from the cluster,
206                 // so explicit test and fail.)
207                 if (replicas < 1 || replicas > 3) {
208                         log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
209                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
210                                         "The replica count must be between 1 and 3.");
211                 }
212                 if (partitions < 1) {
213                         log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
214                         throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
215                 }
216
217                 // create via kafka
218
219         try {
220             final NewTopic topicRequest =
221                     new NewTopic(topic, partitions, (short)replicas);
222             final CreateTopicsResult ctr =
223                     fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
224             final KafkaFuture<Void> ctrResult = ctr.all();
225             ctrResult.get();
226             // underlying Kafka topic created. now setup our API info
227             return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
228         } catch (InterruptedException e) {
229             log.warn("Execution of describeTopics timed out.");
230             throw new ConfigDbException(e);
231         } catch (ExecutionException e) {
232             log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
233             throw new ConfigDbException(e.getCause());
234         }
235                 
236         }
237
238         @Override
239         public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
240                 log.info("Deleting topic: " + topic);
241                 try {
242                         log.info("Loading zookeeper client for topic deletion.");
243                                         // topic creation. (Otherwise, the topic is only partially created
244                         // in ZK.)
245                         
246                         
247                         fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
248                         log.info("Zookeeper client loaded successfully. Deleting topic.");
249                         
250                 } catch (Exception e) {
251                         log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
252                         throw new ConfigDbException(e);
253                 }  finally {
254                         log.info("Closing zookeeper connection.");
255                 }
256         }
257
258         private final ZkClient fZk;
259         private final ConfigDb fCambriaConfig;
260         private final ConfigPath fBaseTopicData;
261
262         private static final String zkTopicsRoot = "/brokers/topics";
263         private static final JSONObject kEmptyAcl = new JSONObject();
264
265         /**
266          * method Providing KafkaTopic Object associated with owner and
267          * transactionenabled or not
268          * 
269          * @param name
270          * @param desc
271          * @param owner
272          * @param transactionEnabled
273          * @return
274          * @throws ConfigDbException
275          */
276         public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
277                         throws ConfigDbException {
278                 return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
279         }
280
281         /**
282          * static method giving kafka topic object
283          * 
284          * @param db
285          * @param basePath
286          * @param name
287          * @param desc
288          * @param owner
289          * @param transactionEnabled
290          * @return
291          * @throws ConfigDbException
292          */
293         public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
294                         boolean transactionEnabled) throws ConfigDbException {
295                 final JSONObject o = new JSONObject();
296                 o.put("owner", owner);
297                 o.put("description", desc);
298                 o.put("txenabled", transactionEnabled);
299                 db.store(basePath.getChild(name), o.toString());
300                 return new KafkaTopic(name, db, basePath);
301         }
302
303         /**
304          * class performing all user opearation like user is eligible to read,
305          * write. permitting a user to write and read,
306          * 
307          * @author anowarul.islam
308          *
309          */
310         public static class KafkaTopic implements Topic {
311                 /**
312                  * constructor initializes
313                  * 
314                  * @param name
315                  * @param configdb
316                  * @param baseTopic
317                  * @throws ConfigDbException
318                  */
319                 public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
320                         fName = name;
321                         fConfigDb = configdb;
322                         fBaseTopicData = baseTopic;
323
324                         String data = fConfigDb.load(fBaseTopicData.getChild(fName));
325                         if (data == null) {
326                                 data = "{}";
327                         }
328
329                         final JSONObject o = new JSONObject(data);
330                         fOwner = o.optString("owner", "");
331                         fDesc = o.optString("description", "");
332                         fTransactionEnabled = o.optBoolean("txenabled", false);// default
333                                                                                                                                         // value is
334                                                                                                                                         // false
335                         // if this topic has an owner, it needs both read/write ACLs. If there's no
336                                                 // owner (or it's empty), null is okay -- this is for existing or implicitly
337                                                 // created topics.
338                                                 JSONObject readers = o.optJSONObject ( "readers" );
339                                                 if ( readers == null && fOwner.length () > 0 )
340                                                 {
341                                                     readers = kEmptyAcl;
342                                                 }
343                                                 fReaders =  fromJson ( readers );
344
345                                                 JSONObject writers = o.optJSONObject ( "writers" );
346                                                 if ( writers == null && fOwner.length () > 0 )
347                                                 {
348                                                     writers = kEmptyAcl;
349                                                 }
350                                                 fWriters = fromJson ( writers );
351                 }
352                 
353                 private NsaAcl fromJson(JSONObject o) {
354                         NsaAcl acl = new NsaAcl();
355                         if (o != null) {
356                                 JSONArray a = o.optJSONArray("allowed");
357                                 if (a != null) {
358                                         for (int i = 0; i < a.length(); ++i) {
359                                                 String user = a.getString(i);
360                                                 acl.add(user);
361                                         }
362                                 }
363                         }
364                         return acl;
365                 }
366
367                 @Override
368                 public String getName() {
369                         return fName;
370                 }
371
372                 @Override
373                 public String getOwner() {
374                         return fOwner;
375                 }
376
377                 @Override
378                 public String getDescription() {
379                         return fDesc;
380                 }
381
382                 @Override
383                 public NsaAcl getReaderAcl() {
384                         return fReaders;
385                 }
386
387                 @Override
388                 public NsaAcl getWriterAcl() {
389                         return fWriters;
390                 }
391
392                 @Override
393                 public void checkUserRead(NsaApiKey user) throws AccessDeniedException  {
394                         NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
395                 }
396
397                 @Override
398                 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException  {
399                         NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
400                 }
401
402                 @Override
403                 public void permitWritesFromUser(String pubId, NsaApiKey asUser)
404                                 throws ConfigDbException, AccessDeniedException {
405                         updateAcl(asUser, false, true, pubId);
406                 }
407
408                 @Override
409                 public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
410                         updateAcl(asUser, false, false, pubId);
411                 }
412
413                 @Override
414                 public void permitReadsByUser(String consumerId, NsaApiKey asUser)
415                                 throws ConfigDbException, AccessDeniedException {
416                         updateAcl(asUser, true, true, consumerId);
417                 }
418
419                 @Override
420                 public void denyReadsByUser(String consumerId, NsaApiKey asUser)
421                                 throws ConfigDbException, AccessDeniedException {
422                         updateAcl(asUser, true, false, consumerId);
423                 }
424
425                 private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
426                                 throws ConfigDbException, AccessDeniedException{
427                         try
428                         {
429                                 final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
430         
431                                 // we have to assume we have current data, or load it again. for the expected use
432                                 // case, assuming we can overwrite the data is fine.
433                                 final JSONObject o = new JSONObject ();
434                                 o.put ( "owner", fOwner );
435                                 o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
436                                 o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
437                                 fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
438                                 
439                                 log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
440         
441                         }
442                         catch ( ConfigDbException | AccessDeniedException x )
443                         {
444                                 throw x;
445                         }
446                         
447                 }
448
449                 private JSONObject safeSerialize(NsaAcl acl) {
450                         return acl == null ? null : acl.serialize();
451                 }
452
453                 private final String fName;
454                 private final ConfigDb fConfigDb;
455                 private final ConfigPath fBaseTopicData;
456                 private final String fOwner;
457                 private final String fDesc;
458                 private final NsaAcl fReaders;
459                 private final NsaAcl fWriters;
460                 private boolean fTransactionEnabled;
461         
462                 public boolean isTransactionEnabled() {
463                         return fTransactionEnabled;
464                 }
465
466                 @Override
467                 public Set<String> getOwners() {
468                         final TreeSet<String> owners = new TreeSet<>();
469                         owners.add ( fOwner );
470                         return owners;
471                 }
472         }
473
474 }