- try {
- ZkClient zkClient = null;
- try {
- log.info("Loading zookeeper client for creating topic.");
- // FIXME: use of this scala module$ thing is a goofy hack to
- // make Kafka aware of the
- // topic creation. (Otherwise, the topic is only partially
- // created in ZK.)
- zkClient = ZkClientFactory.createZkClient();
- log.info("Zookeeper client loaded successfully. Creating topic.");
- AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties());
- } catch (kafka.common.TopicExistsException e) {
- log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e);
- throw new TopicExistsException(topic);
- } catch (ZkNoNodeException e) {
- log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e);
- // Kafka throws this when the server isn't running (and perhaps
- // hasn't ever run)
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "The Kafka cluster is not setup.");
- } catch (kafka.admin.AdminOperationException e) {
- // Kafka throws this when the server isn't running (and perhaps
- // hasn't ever run)
- log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(),
- e);
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "The Kafka cluster can't handle your request. Talk to the administrators.");
- } finally {
- log.info("Closing zookeeper connection.");
- if (zkClient != null)
- zkClient.close();
+
+ try
+ {
+ final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
+ final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
+ final KafkaFuture<Void> ctrResult = ctr.all ();
+ ctrResult.get ();
+ // underlying Kafka topic created. now setup our API info
+ return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );