Merge "Fix sonar issues in messagerouter/msgrtr"
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
index a4ae2be..d7fa28b 100644 (file)
@@ -39,16 +39,13 @@ import org.apache.kafka.common.KafkaFuture;
 import org.json.JSONObject;
 import org.json.JSONArray;
 import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Component;
-
+import org.springframework.util.StringUtils;
 import org.onap.dmaap.dmf.mr.CambriaApiException;
 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
-import org.onap.dmaap.dmf.mr.metabroker.Broker;
 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
 import org.onap.dmaap.dmf.mr.metabroker.Topic;
 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
 import org.onap.dmaap.dmf.mr.utils.Utils;
-//import org.apache.log4-j.Logger;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 
@@ -56,6 +53,7 @@ import com.att.nsa.configs.ConfigDb;
 import com.att.nsa.configs.ConfigDbException;
 import com.att.nsa.configs.ConfigPath;
 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.till.data.stringUtils;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
 import com.att.nsa.security.NsaAcl;
 import com.att.nsa.security.NsaAclUtils;
@@ -78,7 +76,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                final Properties props = new Properties ();
                String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
                                "kafka.metadata.broker.list");
-               if (null == fkafkaBrokers) {
+               if (StringUtils.isEmpty(fkafkaBrokers)) {
 
                        fkafkaBrokers = "localhost:9092";
                }
@@ -94,7 +92,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
            
        }
 
-       //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
        private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
        private final AdminClient fKafkaAdminClient;
        
@@ -109,7 +106,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         */
        public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
                        @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
-               //fSettings = settings;
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
@@ -149,7 +145,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
        @Override
        public List<Topic> getAllTopics() throws ConfigDbException {
                log.info("Retrieving list of all the topics.");
-               final LinkedList<Topic> result = new LinkedList<Topic>();
+               final LinkedList<Topic> result = new LinkedList<>();
                try {
                        log.info("Retrieving all topics from root: " + zkTopicsRoot);
                        final List<String> topics = fZk.getChildren(zkTopicsRoot);
@@ -226,35 +222,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                }
 
                // create via kafka
-               
-                       try
-                       {
-                               final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
-                               final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
-                               final KafkaFuture<Void> ctrResult = ctr.all ();
-                               ctrResult.get ();
-                               // underlying Kafka topic created. now setup our API info
-                               return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
-                       }
-                       catch ( InterruptedException e )
-                       {
-                               
-                               log.warn ( "Execution of describeTopics timed out." );
-                               throw new ConfigDbException ( e );
-                       }
-                       catch ( ExecutionException e )
-                       {
-                               
-                               log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
-                               throw new ConfigDbException ( e.getCause () );
-                       }
+
+        try {
+            final NewTopic topicRequest =
+                    new NewTopic(topic, partitions, (short)replicas);
+            final CreateTopicsResult ctr =
+                    fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+            final KafkaFuture<Void> ctrResult = ctr.all();
+            ctrResult.get();
+            // underlying Kafka topic created. now setup our API info
+            return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+        } catch (InterruptedException e) {
+            log.warn("Execution of describeTopics timed out.");
+            throw new ConfigDbException(e);
+        } catch (ExecutionException e) {
+            log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
+            throw new ConfigDbException(e.getCause());
+        }
                
        }
 
        @Override
        public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
                log.info("Deleting topic: " + topic);
-               ZkClient zkClient = null;
                try {
                        log.info("Loading zookeeper client for topic deletion.");
                                        // topic creation. (Otherwise, the topic is only partially created
@@ -269,15 +259,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        throw new ConfigDbException(e);
                }  finally {
                        log.info("Closing zookeeper connection.");
-                       if (zkClient != null)
-                               zkClient.close();
                }
-
-               // throw new UnsupportedOperationException ( "We can't programmatically
-               // delete Kafka topics yet." );
        }
 
-       //private final rrNvReadable fSettings;
        private final ZkClient fZk;
        private final ConfigDb fCambriaConfig;
        private final ConfigPath fBaseTopicData;
@@ -359,11 +343,17 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                                                // owner (or it's empty), null is okay -- this is for existing or implicitly
                                                // created topics.
                                                JSONObject readers = o.optJSONObject ( "readers" );
-                                               if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
+                                               if ( readers == null && fOwner.length () > 0 )
+                                               {
+                                                   readers = kEmptyAcl;
+                                               }
                                                fReaders =  fromJson ( readers );
 
                                                JSONObject writers = o.optJSONObject ( "writers" );
-                                               if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
+                                               if ( writers == null && fOwner.length () > 0 )
+                                               {
+                                                   writers = kEmptyAcl;
+                                               }
                                                fWriters = fromJson ( writers );
                }
                
@@ -456,11 +446,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                                log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
        
                        }
-                       catch ( ConfigDbException x )
-                       {
-                               throw x;
-                       }
-                       catch ( AccessDeniedException x )
+                       catch ( ConfigDbException | AccessDeniedException x )
                        {
                                throw x;
                        }
@@ -486,7 +472,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                @Override
                public Set<String> getOwners() {
-                       final TreeSet<String> owners = new TreeSet<String> ();
+                       final TreeSet<String> owners = new TreeSet<>();
                        owners.add ( fOwner );
                        return owners;
                }