update the testcases after the kafka 11 changes
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / beans / DMaaPKafkaMetaBroker.java
@@ -8,39 +8,49 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
- *  
+*  
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *
+ *  
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
  *  
  *******************************************************************************/
-package com.att.nsa.cambria.beans;
+package com.att.dmf.mr.beans;
 
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
 
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import org.json.JSONObject;
+import org.json.JSONArray;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.metabroker.Broker;
+import com.att.dmf.mr.metabroker.Broker1;
+import com.att.dmf.mr.metabroker.Topic;
+import com.att.dmf.mr.utils.ConfigurationReader;
 //import org.apache.log4-j.Logger;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.metabroker.Broker;
-import com.att.nsa.cambria.metabroker.Topic;
-import com.att.nsa.cambria.utils.ConfigurationReader;
+//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
 import com.att.nsa.configs.ConfigDb;
 import com.att.nsa.configs.ConfigDbException;
 import com.att.nsa.configs.ConfigPath;
@@ -50,20 +60,42 @@ import com.att.nsa.security.NsaAcl;
 import com.att.nsa.security.NsaAclUtils;
 import com.att.nsa.security.NsaApiKey;
 
-import kafka.admin.AdminUtils;
-import kafka.utils.ZKStringSerializer$;
 
 /**
  * class performing all topic operations
  * 
- * @author author
+ * @author anowarul.islam
  *
  */
-
-public class DMaaPKafkaMetaBroker implements Broker {
+//@Component
+public class DMaaPKafkaMetaBroker implements Broker1 {
+
+       public DMaaPKafkaMetaBroker() {
+               fZk = null;
+               fCambriaConfig = null;
+               fBaseTopicData = null;
+               final Properties props = new Properties ();
+               String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                               "kafka.metadata.broker.list");
+               if (null == fkafkaBrokers) {
+
+                       fkafkaBrokers = "localhost:9092";
+               }
+               
+               
+               
+            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+           /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
+            props.put("sasl.mechanism", "PLAIN");*/
+            fKafkaAdminClient=AdminClient.create ( props );
+           // fKafkaAdminClient = null;
+       }
 
        //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
        private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
+       private final AdminClient fKafkaAdminClient;
+       
        
 
        /**
@@ -79,6 +111,36 @@ public class DMaaPKafkaMetaBroker implements Broker {
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
+               final Properties props = new Properties ();
+               String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                               "kafka.metadata.broker.list");
+               if (null == fkafkaBrokers) {
+
+                       fkafkaBrokers = "localhost:9092";
+               }
+               
+               
+               
+            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+            /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
+            props.put("sasl.mechanism", "PLAIN");*/
+            fKafkaAdminClient=AdminClient.create ( props );
+           // fKafkaAdminClient = null;
+               
+               
+       }
+       
+       public DMaaPKafkaMetaBroker( rrNvReadable settings,
+                       ZkClient zk,  ConfigDb configDb,AdminClient client) {
+               //fSettings = settings;
+               fZk = zk;
+               fCambriaConfig = configDb;
+               fBaseTopicData = configDb.parse("/topics");
+           fKafkaAdminClient= client;
+           // fKafkaAdminClient = null;
+               
+               
        }
 
        @Override
@@ -91,7 +153,6 @@ public class DMaaPKafkaMetaBroker implements Broker {
                        for (String topic : topics) {
                                result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
                        }
-
                        JSONObject dataObj = new JSONObject();
                        dataObj.put("topics", new JSONObject());
 
@@ -132,7 +193,7 @@ public class DMaaPKafkaMetaBroker implements Broker {
         */
        @Override
        public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
-                       boolean transactionEnabled) throws TopicExistsException, CambriaApiException {
+                       boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
                log.info("Creating topic: " + topic);
                try {
                        log.info("Check if topic [" + topic + "] exist.");
@@ -162,78 +223,53 @@ public class DMaaPKafkaMetaBroker implements Broker {
                }
 
                // create via kafka
-               try {
-                       ZkClient zkClient = null;
-                       try {
-                               log.info("Loading zookeeper client for creating topic.");
-                               // FIXME: use of this scala module$ thing is a goofy hack to
-                               // make Kafka aware of the
-                               // topic creation. (Otherwise, the topic is only partially
-                               // created in ZK.)
-                               zkClient = ZkClientFactory.createZkClient();
-                               log.info("Zookeeper client loaded successfully. Creating topic.");
-                               AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties());
-                       } catch (kafka.common.TopicExistsException e) {
-                               log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e);
-                               throw new TopicExistsException(topic);
-                       } catch (ZkNoNodeException e) {
-                               log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e);
-                               // Kafka throws this when the server isn't running (and perhaps
-                               // hasn't ever run)
-                               throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
-                                               "The Kafka cluster is not setup.");
-                       } catch (kafka.admin.AdminOperationException e) {
-                               // Kafka throws this when the server isn't running (and perhaps
-                               // hasn't ever run)
-                               log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(),
-                                               e);
-                               throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
-                                               "The Kafka cluster can't handle your request. Talk to the administrators.");
-                       } finally {
-                               log.info("Closing zookeeper connection.");
-                               if (zkClient != null)
-                                       zkClient.close();
+               
+                       try
+                       {
+                               final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
+                               final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
+                               final KafkaFuture<Void> ctrResult = ctr.all ();
+                               ctrResult.get ();
+                               // underlying Kafka topic created. now setup our API info
+                               return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
                        }
-
-                       log.info("Creating topic entry for topic: " + topic);
-                       // underlying Kafka topic created. now setup our API info
-                       return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
-               } catch (ConfigDbException excp) {
-                       log.error("Failed to create topic data. Talk to the administrators: " + excp.getMessage(), excp);
-                       throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
-                                       "Failed to create topic data. Talk to the administrators.");
-               }
+                       catch ( InterruptedException e )
+                       {
+                               //timer.fail ( "Timeout" );
+                               log.warn ( "Execution of describeTopics timed out." );
+                               throw new ConfigDbException ( e );
+                       }
+                       catch ( ExecutionException e )
+                       {
+                               //timer.fail ( "ExecutionError" );
+                               log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
+                               throw new ConfigDbException ( e.getCause () );
+                       }
+               
        }
 
        @Override
-       public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException {
+       public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
                log.info("Deleting topic: " + topic);
                ZkClient zkClient = null;
                try {
                        log.info("Loading zookeeper client for topic deletion.");
-                       // FIXME: use of this scala module$ thing is a goofy hack to make
-                       // Kafka aware of the
-                       // topic creation. (Otherwise, the topic is only partially created
+                                       // topic creation. (Otherwise, the topic is only partially created
                        // in ZK.)
-                       zkClient = ZkClientFactory.createZkClient();
-
+                       /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
+                                       ZKStringSerializer$.MODULE$);
+                       String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
+                       if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
+                       ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
+                       */
+                       
+                       fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
                        log.info("Zookeeper client loaded successfully. Deleting topic.");
-                       AdminUtils.deleteTopic(zkClient, topic);
-               } catch (kafka.common.TopicExistsException e) {
+                       //AdminUtils.deleteTopic(zkutils, topic);
+               } catch (Exception e) {
                        log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
-                       throw new TopicExistsException(topic);
-               } catch (ZkNoNodeException e) {
-                       log.error("Failed to delete topic [" + topic + "]. The Kafka cluster is not setup." + e.getMessage(), e);
-                       // Kafka throws this when the server isn't running (and perhaps
-                       // hasn't ever run)
-                       throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, "The Kafka cluster is not setup.");
-               } catch (kafka.admin.AdminOperationException e) {
-                       // Kafka throws this when the server isn't running (and perhaps
-                       // hasn't ever run)
-                       log.error("The Kafka cluster can't handle your request. Talk to the administrators." + e.getMessage(), e);
-                       throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
-                                       "The Kafka cluster can't handle your request. Talk to the administrators.");
-               } finally {
+                       throw new ConfigDbException(e);
+               }  finally {
                        log.info("Closing zookeeper connection.");
                        if (zkClient != null)
                                zkClient.close();
@@ -242,8 +278,6 @@ public class DMaaPKafkaMetaBroker implements Broker {
                // throw new UnsupportedOperationException ( "We can't programmatically
                // delete Kafka topics yet." );
        }
-       
-       
 
        //private final rrNvReadable fSettings;
        private final ZkClient fZk;
@@ -295,7 +329,7 @@ public class DMaaPKafkaMetaBroker implements Broker {
         * class performing all user opearation like user is eligible to read,
         * write. permitting a user to write and read,
         * 
-        * @author author
+        * @author anowarul.islam
         *
         */
        public static class KafkaTopic implements Topic {
@@ -328,25 +362,27 @@ public class DMaaPKafkaMetaBroker implements Broker {
                                                // created topics.
                                                JSONObject readers = o.optJSONObject ( "readers" );
                                                if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
-                                               fReaders = fromJson ( readers );
+                                               fReaders =  fromJson ( readers );
 
                                                JSONObject writers = o.optJSONObject ( "writers" );
                                                if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
                                                fWriters = fromJson ( writers );
                }
-                private NsaAcl fromJson(JSONObject o) {
-                               NsaAcl acl = new NsaAcl();
-                               if (o != null) {
-                                       JSONArray a = o.optJSONArray("allowed");
-                                       if (a != null) {
-                                               for (int i = 0; i < a.length(); ++i) {
-                                                       String user = a.getString(i);
-                                                       acl.add(user);
-                                               }
+               
+               private NsaAcl fromJson(JSONObject o) {
+                       NsaAcl acl = new NsaAcl();
+                       if (o != null) {
+                               JSONArray a = o.optJSONArray("allowed");
+                               if (a != null) {
+                                       for (int i = 0; i < a.length(); ++i) {
+                                               String user = a.getString(i);
+                                               acl.add(user);
                                        }
                                }
-                               return acl;
                        }
+                       return acl;
+               }
+
                @Override
                public String getName() {
                        return fName;
@@ -445,7 +481,7 @@ public class DMaaPKafkaMetaBroker implements Broker {
                private final NsaAcl fReaders;
                private final NsaAcl fWriters;
                private boolean fTransactionEnabled;
-
+       
                public boolean isTransactionEnabled() {
                        return fTransactionEnabled;
                }