X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fbeans%2FDMaaPKafkaMetaBroker.java;h=d7fa28b7b4d5e23c4e7ad47517df44dd6b07f727;hb=db3bff2db9380164b21772398c5e67e423a1d58d;hp=03a1bd5d78f61e51d58a31605b0f297af02e7093;hpb=49924c6b07a7f9418f04d09f9c94fac8e7d44485;p=dmaap%2Fmessagerouter%2Fmsgrtr.git diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 03a1bd5..d7fa28b 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -222,28 +222,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } // create via kafka - - try - { - final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() ); - final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) ); - final KafkaFuture ctrResult = ctr.all (); - ctrResult.get (); - // underlying Kafka topic created. now setup our API info - return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled ); - } - catch ( InterruptedException e ) - { - - log.warn ( "Execution of describeTopics timed out." ); - throw new ConfigDbException ( e ); - } - catch ( ExecutionException e ) - { - - log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); - throw new ConfigDbException ( e.getCause () ); - } + + try { + final NewTopic topicRequest = + new NewTopic(topic, partitions, (short)replicas); + final CreateTopicsResult ctr = + fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); + final KafkaFuture ctrResult = ctr.all(); + ctrResult.get(); + // underlying Kafka topic created. now setup our API info + return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); + } catch (InterruptedException e) { + log.warn("Execution of describeTopics timed out."); + throw new ConfigDbException(e); + } catch (ExecutionException e) { + log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e); + throw new ConfigDbException(e.getCause()); + } } @@ -348,11 +343,17 @@ public class DMaaPKafkaMetaBroker implements Broker1 { // owner (or it's empty), null is okay -- this is for existing or implicitly // created topics. JSONObject readers = o.optJSONObject ( "readers" ); - if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl; + if ( readers == null && fOwner.length () > 0 ) + { + readers = kEmptyAcl; + } fReaders = fromJson ( readers ); JSONObject writers = o.optJSONObject ( "writers" ); - if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl; + if ( writers == null && fOwner.length () > 0 ) + { + writers = kEmptyAcl; + } fWriters = fromJson ( writers ); } @@ -445,11 +446,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); } - catch ( ConfigDbException x ) - { - throw x; - } - catch ( AccessDeniedException x ) + catch ( ConfigDbException | AccessDeniedException x ) { throw x; }