X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fbeans%2FDMaaPKafkaMetaBroker.java;fp=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fbeans%2FDMaaPKafkaMetaBroker.java;h=1e20ee2a6d271ed154df91fb7e670406fc84034b;hb=0fa52b6adaf4d163a12ca38a750780f99d78b852;hp=a4ae2be833099af0a47ac104fc2d3883b90dfef3;hpb=ef236590f93ec27bee7e69847696b8e8d14ffeff;p=dmaap%2Fmessagerouter%2Fmsgrtr.git diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index a4ae2be..1e20ee2 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -39,16 +39,13 @@ import org.apache.kafka.common.KafkaFuture; import org.json.JSONObject; import org.json.JSONArray; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Component; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; -import org.onap.dmaap.dmf.mr.metabroker.Broker; import org.onap.dmaap.dmf.mr.metabroker.Broker1; import org.onap.dmaap.dmf.mr.metabroker.Topic; import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; import org.onap.dmaap.dmf.mr.utils.Utils; -//import org.apache.log4-j.Logger; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -94,7 +91,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } - //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); private final AdminClient fKafkaAdminClient; @@ -109,7 +105,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { */ public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings, @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { - //fSettings = settings; fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); @@ -149,7 +144,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public List getAllTopics() throws ConfigDbException { log.info("Retrieving list of all the topics."); - final LinkedList result = new LinkedList(); + final LinkedList result = new LinkedList<>(); try { log.info("Retrieving all topics from root: " + zkTopicsRoot); final List topics = fZk.getChildren(zkTopicsRoot); @@ -229,7 +224,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { try { - final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () ); + final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue() ); final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) ); final KafkaFuture ctrResult = ctr.all (); ctrResult.get (); @@ -254,7 +249,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException { log.info("Deleting topic: " + topic); - ZkClient zkClient = null; try { log.info("Loading zookeeper client for topic deletion."); // topic creation. (Otherwise, the topic is only partially created @@ -269,15 +263,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 { throw new ConfigDbException(e); } finally { log.info("Closing zookeeper connection."); - if (zkClient != null) - zkClient.close(); } - - // throw new UnsupportedOperationException ( "We can't programmatically - // delete Kafka topics yet." ); } - //private final rrNvReadable fSettings; private final ZkClient fZk; private final ConfigDb fCambriaConfig; private final ConfigPath fBaseTopicData; @@ -486,7 +474,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public Set getOwners() { - final TreeSet owners = new TreeSet (); + final TreeSet owners = new TreeSet<>(); owners.add ( fOwner ); return owners; }