-
- try
- {
- final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() );
- final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
- final KafkaFuture<Void> ctrResult = ctr.all ();
- ctrResult.get ();
- // underlying Kafka topic created. now setup our API info
- return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
- }
- catch ( InterruptedException e )
- {
-
- log.warn ( "Execution of describeTopics timed out." );
- throw new ConfigDbException ( e );
- }
- catch ( ExecutionException e )
- {
-
- log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
- throw new ConfigDbException ( e.getCause () );
- }
+
+ try {
+ final NewTopic topicRequest =
+ new NewTopic(topic, partitions, (short)replicas);
+ final CreateTopicsResult ctr =
+ fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+ final KafkaFuture<Void> ctrResult = ctr.all();
+ ctrResult.get();
+ // underlying Kafka topic created. now setup our API info
+ return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+ } catch (InterruptedException e) {
+ log.warn("Execution of describeTopics timed out.");
+ throw new ConfigDbException(e);
+ } catch (ExecutionException e) {
+ log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
+ throw new ConfigDbException(e.getCause());
+ }