Merge "Fix sonar issues in messagerouter/msgrtr"
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
index 1e20ee2..d7fa28b 100644 (file)
@@ -39,7 +39,7 @@ import org.apache.kafka.common.KafkaFuture;
 import org.json.JSONObject;
 import org.json.JSONArray;
 import org.springframework.beans.factory.annotation.Qualifier;
-
+import org.springframework.util.StringUtils;
 import org.onap.dmaap.dmf.mr.CambriaApiException;
 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
@@ -53,6 +53,7 @@ import com.att.nsa.configs.ConfigDb;
 import com.att.nsa.configs.ConfigDbException;
 import com.att.nsa.configs.ConfigPath;
 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.till.data.stringUtils;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
 import com.att.nsa.security.NsaAcl;
 import com.att.nsa.security.NsaAclUtils;
@@ -75,7 +76,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                final Properties props = new Properties ();
                String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
                                "kafka.metadata.broker.list");
-               if (null == fkafkaBrokers) {
+               if (StringUtils.isEmpty(fkafkaBrokers)) {
 
                        fkafkaBrokers = "localhost:9092";
                }
@@ -221,28 +222,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                }
 
                // create via kafka
-               
-                       try
-                       {
-                               final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() );
-                               final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
-                               final KafkaFuture<Void> ctrResult = ctr.all ();
-                               ctrResult.get ();
-                               // underlying Kafka topic created. now setup our API info
-                               return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
-                       }
-                       catch ( InterruptedException e )
-                       {
-                               
-                               log.warn ( "Execution of describeTopics timed out." );
-                               throw new ConfigDbException ( e );
-                       }
-                       catch ( ExecutionException e )
-                       {
-                               
-                               log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
-                               throw new ConfigDbException ( e.getCause () );
-                       }
+
+        try {
+            final NewTopic topicRequest =
+                    new NewTopic(topic, partitions, (short)replicas);
+            final CreateTopicsResult ctr =
+                    fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+            final KafkaFuture<Void> ctrResult = ctr.all();
+            ctrResult.get();
+            // underlying Kafka topic created. now setup our API info
+            return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+        } catch (InterruptedException e) {
+            log.warn("Execution of describeTopics timed out.");
+            throw new ConfigDbException(e);
+        } catch (ExecutionException e) {
+            log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
+            throw new ConfigDbException(e.getCause());
+        }
                
        }
 
@@ -347,11 +343,17 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                                                // owner (or it's empty), null is okay -- this is for existing or implicitly
                                                // created topics.
                                                JSONObject readers = o.optJSONObject ( "readers" );
-                                               if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
+                                               if ( readers == null && fOwner.length () > 0 )
+                                               {
+                                                   readers = kEmptyAcl;
+                                               }
                                                fReaders =  fromJson ( readers );
 
                                                JSONObject writers = o.optJSONObject ( "writers" );
-                                               if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
+                                               if ( writers == null && fOwner.length () > 0 )
+                                               {
+                                                   writers = kEmptyAcl;
+                                               }
                                                fWriters = fromJson ( writers );
                }
                
@@ -444,11 +446,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                                log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
        
                        }
-                       catch ( ConfigDbException x )
-                       {
-                               throw x;
-                       }
-                       catch ( AccessDeniedException x )
+                       catch ( ConfigDbException | AccessDeniedException x )
                        {
                                throw x;
                        }