[MR] Add support for configuring jaas.sasl.config at runtime
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
index 9ab4c83..7a08345 100644 (file)
@@ -8,16 +8,16 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
-*  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *  
+ *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ *
  *******************************************************************************/
 package org.onap.dmaap.dmf.mr.beans;
 
@@ -55,7 +55,7 @@ import java.util.concurrent.ExecutionException;
 
 /**
  * class performing all topic operations
- * 
+ *
  * @author anowarul.islam
  *
  */
@@ -73,32 +73,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                        fkafkaBrokers = "localhost:9092";
                }
-               
-            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-            if(Utils.isCadiEnabled()){
-            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
-                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
-            props.put("sasl.mechanism", "PLAIN");
-            }
-          
-            fKafkaAdminClient=AdminClient.create ( props );
-           
+
+               props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+               if(Utils.isCadiEnabled()){
+                       props.putAll(Utils.addSaslProps());
+               }
+               fKafkaAdminClient=AdminClient.create ( props );
+
        }
 
        private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
        private final AdminClient fKafkaAdminClient;
-       
-       
+
+
 
        /**
         * DMaaPKafkaMetaBroker constructor initializing
-        * 
+        *
         * @param settings
         * @param zk
         * @param configDb
         */
        public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
-                       @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+                                                               @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
@@ -109,30 +106,28 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                        fkafkaBrokers = "localhost:9092";
                }
-               
-                if(Utils.isCadiEnabled()){
-                props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
-                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
-            props.put("sasl.mechanism", "PLAIN");
-                }
-            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-            
-            fKafkaAdminClient=AdminClient.create ( props );
-           
-               
-               
+
+               if(Utils.isCadiEnabled()){
+                       props.putAll(Utils.addSaslProps());
+               }
+               props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+
+               fKafkaAdminClient=AdminClient.create ( props );
+
+
+
        }
-       
+
        public DMaaPKafkaMetaBroker( rrNvReadable settings,
-                       ZkClient zk,  ConfigDb configDb,AdminClient client) {
-               
+                                                                ZkClient zk,  ConfigDb configDb,AdminClient client) {
+
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
-           fKafkaAdminClient= client;
-          
-               
-               
+               fKafkaAdminClient= client;
+
+
+
        }
 
        @Override
@@ -169,7 +164,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
        /**
         * static method get KafkaTopic object
-        * 
+        *
         * @param db
         * @param base
         * @param topic
@@ -185,7 +180,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         */
        @Override
        public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
-                       boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+                                                        boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
                log.info("Creating topic: " + topic);
                try {
                        log.info("Check if topic [" + topic + "] exist.");
@@ -216,23 +211,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                // create via kafka
 
-        try {
-            final NewTopic topicRequest =
-                    new NewTopic(topic, partitions, (short)replicas);
-            final CreateTopicsResult ctr =
-                    fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
-            final KafkaFuture<Void> ctrResult = ctr.all();
-            ctrResult.get();
-            // underlying Kafka topic created. now setup our API info
-            return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
-        } catch (InterruptedException e) {
-            log.warn("Execution of describeTopics timed out.");
-            throw new ConfigDbException(e);
-        } catch (ExecutionException e) {
-            log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
-            throw new ConfigDbException(e.getCause());
-        }
-               
+               try {
+                       final NewTopic topicRequest =
+                                       new NewTopic(topic, partitions, (short)replicas);
+                       final CreateTopicsResult ctr =
+                                       fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+                       final KafkaFuture<Void> ctrResult = ctr.all();
+                       ctrResult.get();
+                       // underlying Kafka topic created. now setup our API info
+                       return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+               } catch (InterruptedException e) {
+                       log.warn("Execution of describeTopics timed out.");
+                       throw new ConfigDbException(e);
+               } catch (ExecutionException e) {
+                       log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
+                       throw new ConfigDbException(e.getCause());
+               }
+
        }
 
        @Override
@@ -240,13 +235,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                log.info("Deleting topic: " + topic);
                try {
                        log.info("Loading zookeeper client for topic deletion.");
-                                       // topic creation. (Otherwise, the topic is only partially created
+                       // topic creation. (Otherwise, the topic is only partially created
                        // in ZK.)
-                       
-                       
+
+
                        fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
                        log.info("Zookeeper client loaded successfully. Deleting topic.");
-                       
+
                } catch (Exception e) {
                        log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
                        throw new ConfigDbException(e);
@@ -265,7 +260,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
        /**
         * method Providing KafkaTopic Object associated with owner and
         * transactionenabled or not
-        * 
+        *
         * @param name
         * @param desc
         * @param owner
@@ -280,7 +275,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
        /**
         * static method giving kafka topic object
-        * 
+        *
         * @param db
         * @param basePath
         * @param name
@@ -291,7 +286,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         * @throws ConfigDbException
         */
        public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
-                       boolean transactionEnabled) throws ConfigDbException {
+                                                                                         boolean transactionEnabled) throws ConfigDbException {
                final JSONObject o = new JSONObject();
                o.put("owner", owner);
                o.put("description", desc);
@@ -303,14 +298,14 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
        /**
         * class performing all user opearation like user is eligible to read,
         * write. permitting a user to write and read,
-        * 
+        *
         * @author anowarul.islam
         *
         */
        public static class KafkaTopic implements Topic {
                /**
                 * constructor initializes
-                * 
+                *
                 * @param name
                 * @param configdb
                 * @param baseTopic
@@ -330,26 +325,26 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        fOwner = o.optString("owner", "");
                        fDesc = o.optString("description", "");
                        fTransactionEnabled = o.optBoolean("txenabled", false);// default
-                                                                                                                                       // value is
-                                                                                                                                       // false
+                       // value is
+                       // false
                        // if this topic has an owner, it needs both read/write ACLs. If there's no
-                                               // owner (or it's empty), null is okay -- this is for existing or implicitly
-                                               // created topics.
-                                               JSONObject readers = o.optJSONObject ( "readers" );
-                                               if ( readers == null && fOwner.length () > 0 )
-                                               {
-                                                   readers = kEmptyAcl;
-                                               }
-                                               fReaders =  fromJson ( readers );
-
-                                               JSONObject writers = o.optJSONObject ( "writers" );
-                                               if ( writers == null && fOwner.length () > 0 )
-                                               {
-                                                   writers = kEmptyAcl;
-                                               }
-                                               fWriters = fromJson ( writers );
+                       // owner (or it's empty), null is okay -- this is for existing or implicitly
+                       // created topics.
+                       JSONObject readers = o.optJSONObject ( "readers" );
+                       if ( readers == null && fOwner.length () > 0 )
+                       {
+                               readers = kEmptyAcl;
+                       }
+                       fReaders =  fromJson ( readers );
+
+                       JSONObject writers = o.optJSONObject ( "writers" );
+                       if ( writers == null && fOwner.length () > 0 )
+                       {
+                               writers = kEmptyAcl;
+                       }
+                       fWriters = fromJson ( writers );
                }
-               
+
                private NsaAcl fromJson(JSONObject o) {
                        NsaAcl acl = new NsaAcl();
                        if (o != null) {
@@ -427,7 +422,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        try
                        {
                                final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-       
+
                                // we have to assume we have current data, or load it again. for the expected use
                                // case, assuming we can overwrite the data is fine.
                                final JSONObject o = new JSONObject ();
@@ -435,15 +430,15 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                                o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
                                o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
                                fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
-                               
+
                                log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-       
+
                        }
                        catch ( ConfigDbException | AccessDeniedException x )
                        {
                                throw x;
                        }
-                       
+
                }
 
                private JSONObject safeSerialize(NsaAcl acl) {
@@ -458,7 +453,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                private final NsaAcl fReaders;
                private final NsaAcl fWriters;
                private boolean fTransactionEnabled;
-       
+
                public boolean isTransactionEnabled() {
                        return fTransactionEnabled;
                }