add testcases
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
index a4ae2be..03a1bd5 100644 (file)
@@ -39,16 +39,13 @@ import org.apache.kafka.common.KafkaFuture;
 import org.json.JSONObject;
 import org.json.JSONArray;
 import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Component;
-
+import org.springframework.util.StringUtils;
 import org.onap.dmaap.dmf.mr.CambriaApiException;
 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
-import org.onap.dmaap.dmf.mr.metabroker.Broker;
 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
 import org.onap.dmaap.dmf.mr.metabroker.Topic;
 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
 import org.onap.dmaap.dmf.mr.utils.Utils;
-//import org.apache.log4-j.Logger;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 
@@ -56,6 +53,7 @@ import com.att.nsa.configs.ConfigDb;
 import com.att.nsa.configs.ConfigDbException;
 import com.att.nsa.configs.ConfigPath;
 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.till.data.stringUtils;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
 import com.att.nsa.security.NsaAcl;
 import com.att.nsa.security.NsaAclUtils;
@@ -78,7 +76,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                final Properties props = new Properties ();
                String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
                                "kafka.metadata.broker.list");
-               if (null == fkafkaBrokers) {
+               if (StringUtils.isEmpty(fkafkaBrokers)) {
 
                        fkafkaBrokers = "localhost:9092";
                }
@@ -94,7 +92,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
            
        }
 
-       //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
        private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
        private final AdminClient fKafkaAdminClient;
        
@@ -109,7 +106,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         */
        public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
                        @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
-               //fSettings = settings;
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
@@ -149,7 +145,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
        @Override
        public List<Topic> getAllTopics() throws ConfigDbException {
                log.info("Retrieving list of all the topics.");
-               final LinkedList<Topic> result = new LinkedList<Topic>();
+               final LinkedList<Topic> result = new LinkedList<>();
                try {
                        log.info("Retrieving all topics from root: " + zkTopicsRoot);
                        final List<String> topics = fZk.getChildren(zkTopicsRoot);
@@ -229,7 +225,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                
                        try
                        {
-                               final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
+                               final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() );
                                final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
                                final KafkaFuture<Void> ctrResult = ctr.all ();
                                ctrResult.get ();
@@ -254,7 +250,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
        @Override
        public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
                log.info("Deleting topic: " + topic);
-               ZkClient zkClient = null;
                try {
                        log.info("Loading zookeeper client for topic deletion.");
                                        // topic creation. (Otherwise, the topic is only partially created
@@ -269,15 +264,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        throw new ConfigDbException(e);
                }  finally {
                        log.info("Closing zookeeper connection.");
-                       if (zkClient != null)
-                               zkClient.close();
                }
-
-               // throw new UnsupportedOperationException ( "We can't programmatically
-               // delete Kafka topics yet." );
        }
 
-       //private final rrNvReadable fSettings;
        private final ZkClient fZk;
        private final ConfigDb fCambriaConfig;
        private final ConfigPath fBaseTopicData;
@@ -486,7 +475,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                @Override
                public Set<String> getOwners() {
-                       final TreeSet<String> owners = new TreeSet<String> ();
+                       final TreeSet<String> owners = new TreeSet<>();
                        owners.add ( fOwner );
                        return owners;
                }