[DMAAP-MR] Get topics from kafka option
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPKafkaMetaBroker.java
index 7a08345..ae7414e 100644 (file)
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.KafkaFuture;
 import org.json.JSONArray;
@@ -44,7 +45,6 @@ import org.onap.dmaap.dmf.mr.CambriaApiException;
 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
 import org.onap.dmaap.dmf.mr.metabroker.Topic;
-import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
 import org.onap.dmaap.dmf.mr.utils.Utils;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.util.StringUtils;
@@ -62,31 +62,35 @@ import java.util.concurrent.ExecutionException;
 //@Component
 public class DMaaPKafkaMetaBroker implements Broker1 {
 
+       private static final EELFLogger log = EELFManager.getLogger(DMaaPKafkaMetaBroker.class);
+       private final AdminClient fKafkaAdminClient;
+       private static final boolean GET_TOPICS_FROM_ZK = Boolean.parseBoolean(System.getenv().
+               getOrDefault("useZkTopicStore", "true"));
+       private final ZkClient fZk;
+       private final ConfigDb fCambriaConfig;
+       private final ConfigPath fBaseTopicData;
+       private static final String ZK_TOPICS_ROOT = "/brokers/topics";
+       private static final JSONObject kEmptyAcl = new JSONObject();
+
        public DMaaPKafkaMetaBroker() {
                fZk = null;
                fCambriaConfig = null;
                fBaseTopicData = null;
                final Properties props = new Properties ();
                String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "kafka.metadata.broker.list");
-               if (StringUtils.isEmpty(fkafkaBrokers)) {
+                       "kafka.metadata.broker.list");
 
+               if (StringUtils.isEmpty(fkafkaBrokers)) {
                        fkafkaBrokers = "localhost:9092";
                }
-
                props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+
                if(Utils.isCadiEnabled()){
                        props.putAll(Utils.addSaslProps());
                }
                fKafkaAdminClient=AdminClient.create ( props );
-
        }
 
-       private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
-       private final AdminClient fKafkaAdminClient;
-
-
-
        /**
         * DMaaPKafkaMetaBroker constructor initializing
         *
@@ -95,50 +99,61 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         * @param configDb
         */
        public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
-                                                               @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+               @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
                final Properties props = new Properties ();
                String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "kafka.metadata.broker.list");
-               if (null == fkafkaBrokers) {
+                       "kafka.metadata.broker.list");
 
+               if (null == fkafkaBrokers) {
                        fkafkaBrokers = "localhost:9092";
                }
+               props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
 
                if(Utils.isCadiEnabled()){
                        props.putAll(Utils.addSaslProps());
                }
-               props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-
                fKafkaAdminClient=AdminClient.create ( props );
-
-
-
        }
 
-       public DMaaPKafkaMetaBroker( rrNvReadable settings,
-                                                                ZkClient zk,  ConfigDb configDb,AdminClient client) {
-
+       public DMaaPKafkaMetaBroker(ZkClient zk,  ConfigDb configDb,AdminClient client) {
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
                fKafkaAdminClient= client;
-
-
-
        }
 
        @Override
        public List<Topic> getAllTopics() throws ConfigDbException {
                log.info("Retrieving list of all the topics.");
-               final LinkedList<Topic> result = new LinkedList<>();
+               if (!GET_TOPICS_FROM_ZK) {
+                       return getTopicsFromKafka();
+               }
+               return getTopicsFromZookeeper();
+       }
+
+       private LinkedList<Topic> getTopicsFromKafka() throws ConfigDbException {
+               LinkedList<Topic> res = new LinkedList<>();
+               final ListTopicsResult ltr = fKafkaAdminClient.listTopics();
                try {
-                       log.info("Retrieving all topics from root: " + zkTopicsRoot);
-                       final List<String> topics = fZk.getChildren(zkTopicsRoot);
+                       for (String name: ltr.names().get()) {
+                               res.add(new KafkaTopic(name, fCambriaConfig, fBaseTopicData));
+                       }
+               } catch (InterruptedException | ExecutionException e) {
+                       log.error("GetAllTopicsFromKafka: Failed to retrieve topic list from kafka.", e);
+               }
+               return res;
+       }
+
+       private LinkedList<Topic> getTopicsFromZookeeper() throws ConfigDbException {
+               final LinkedList<Topic> legacyResult = new LinkedList<>();
+               try {
+                       log.info("Retrieving all topics from root: " + ZK_TOPICS_ROOT);
+                       final List<String> topics = fZk.getChildren(ZK_TOPICS_ROOT);
                        for (String topic : topics) {
-                               result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
+                               legacyResult.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
                        }
                        JSONObject dataObj = new JSONObject();
                        dataObj.put("topics", new JSONObject());
@@ -148,17 +163,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        }
                } catch (ZkNoNodeException excp) {
                        // very fresh kafka doesn't have any topics or a topics node
-                       log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
+                       log.error("ZK doesn't have a Kakfa topics node at " + ZK_TOPICS_ROOT, excp);
                }
-               return result;
+               return legacyResult;
        }
 
        @Override
        public Topic getTopic(String topic) throws ConfigDbException {
-               if (fZk.exists(zkTopicsRoot + "/" + topic)) {
+               if (!GET_TOPICS_FROM_ZK) {
+                       try {
+                               for (String name : fKafkaAdminClient.listTopics().names().get()) {
+                                       if (name.equals(topic)) {
+                                               log.debug("TOPIC_NAME: {} is equal to : {}", name, topic);
+                                               return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
+                                       }
+                               }
+                       } catch (InterruptedException | ExecutionException e) {
+                               log.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e);
+                               return null;
+                       }
+               } else if (fZk.exists(ZK_TOPICS_ROOT + "/" + topic)) {
                        return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
                }
-               // else: no such topic in kafka
+               // else: no such topic
                return null;
        }
 
@@ -180,42 +207,38 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         */
        @Override
        public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
-                                                        boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
-               log.info("Creating topic: " + topic);
+               boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+               log.info("Creating topic: {}", topic);
                try {
-                       log.info("Check if topic [" + topic + "] exist.");
+                       log.info("Check if topic [{}] exist.", topic);
                        // first check for existence "our way"
                        final Topic t = getTopic(topic);
                        if (t != null) {
-                               log.info("Could not create topic [" + topic + "]. Topic Already exists.");
+                               log.info("Could not create topic [{}]. Topic Already exists.", topic);
                                throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
                        }
                } catch (ConfigDbException e1) {
                        log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
                        throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
-                                       "Couldn't check topic data in config db.");
+                               "Couldn't check topic data in config db.");
                }
 
                // we only allow 3 replicas. (If we don't test this, we get weird
                // results from the cluster,
                // so explicit test and fail.)
                if (replicas < 1 || replicas > 3) {
-                       log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
+                       log.info("Topic [{}] could not be created. The replica count must be between 1 and 3.", topic);
                        throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
-                                       "The replica count must be between 1 and 3.");
+                               "The replica count must be between 1 and 3.");
                }
                if (partitions < 1) {
-                       log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
+                       log.info("Topic [{}] could not be created. The partition count must be at least 1.", topic);
                        throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
                }
-
                // create via kafka
-
                try {
-                       final NewTopic topicRequest =
-                                       new NewTopic(topic, partitions, (short)replicas);
-                       final CreateTopicsResult ctr =
-                                       fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+                       final NewTopic topicRequest = new NewTopic(topic, partitions, (short)replicas);
+                       final CreateTopicsResult ctr = fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
                        final KafkaFuture<Void> ctrResult = ctr.all();
                        ctrResult.get();
                        // underlying Kafka topic created. now setup our API info
@@ -232,16 +255,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
        @Override
        public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
-               log.info("Deleting topic: " + topic);
+               log.info("Deleting topic: {}", topic);
                try {
                        log.info("Loading zookeeper client for topic deletion.");
                        // topic creation. (Otherwise, the topic is only partially created
                        // in ZK.)
-
-
                        fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
                        log.info("Zookeeper client loaded successfully. Deleting topic.");
-
                } catch (Exception e) {
                        log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
                        throw new ConfigDbException(e);
@@ -250,13 +270,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                }
        }
 
-       private final ZkClient fZk;
-       private final ConfigDb fCambriaConfig;
-       private final ConfigPath fBaseTopicData;
-
-       private static final String zkTopicsRoot = "/brokers/topics";
-       private static final JSONObject kEmptyAcl = new JSONObject();
-
        /**
         * method Providing KafkaTopic Object associated with owner and
         * transactionenabled or not
@@ -269,7 +282,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         * @throws ConfigDbException
         */
        public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
-                       throws ConfigDbException {
+               throws ConfigDbException {
                return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
        }
 
@@ -286,18 +299,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         * @throws ConfigDbException
         */
        public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
-                                                                                         boolean transactionEnabled) throws ConfigDbException {
+               boolean transactionEnabled) throws ConfigDbException {
                final JSONObject o = new JSONObject();
                o.put("owner", owner);
                o.put("description", desc);
                o.put("txenabled", transactionEnabled);
-               db.store(basePath.getChild(name), o.toString());
+               if (GET_TOPICS_FROM_ZK) {
+                       db.store(basePath.getChild(name), o.toString());
+               }
                return new KafkaTopic(name, db, basePath);
        }
 
        /**
-        * class performing all user opearation like user is eligible to read,
-        * write. permitting a user to write and read,
+        * class performing all user operation like user is eligible to read,
+        * write. permitting a user to write and read etc
         *
         * @author anowarul.islam
         *
@@ -311,6 +326,16 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                 * @param baseTopic
                 * @throws ConfigDbException
                 */
+
+               private final String fName;
+               private final ConfigDb fConfigDb;
+               private final ConfigPath fBaseTopicData;
+               private final String fOwner;
+               private final String fDesc;
+               private final NsaAcl fReaders;
+               private final NsaAcl fWriters;
+               private final boolean fTransactionEnabled;
+
                public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
                        fName = name;
                        fConfigDb = configdb;
@@ -396,7 +421,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                @Override
                public void permitWritesFromUser(String pubId, NsaApiKey asUser)
-                               throws ConfigDbException, AccessDeniedException {
+                       throws ConfigDbException, AccessDeniedException {
                        updateAcl(asUser, false, true, pubId);
                }
 
@@ -407,22 +432,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                @Override
                public void permitReadsByUser(String consumerId, NsaApiKey asUser)
-                               throws ConfigDbException, AccessDeniedException {
+                       throws ConfigDbException, AccessDeniedException {
                        updateAcl(asUser, true, true, consumerId);
                }
 
                @Override
                public void denyReadsByUser(String consumerId, NsaApiKey asUser)
-                               throws ConfigDbException, AccessDeniedException {
+                       throws ConfigDbException, AccessDeniedException {
                        updateAcl(asUser, true, false, consumerId);
                }
 
                private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
-                               throws ConfigDbException, AccessDeniedException{
-                       try
-                       {
+                       throws ConfigDbException, AccessDeniedException{
+                       try {
                                final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-
                                // we have to assume we have current data, or load it again. for the expected use
                                // case, assuming we can overwrite the data is fine.
                                final JSONObject o = new JSONObject ();
@@ -432,10 +455,8 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                                fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
 
                                log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-
-                       }
-                       catch ( ConfigDbException | AccessDeniedException x )
-                       {
+                       } catch ( ConfigDbException | AccessDeniedException x ) {
+                               log.info("Error when trying to update acl for key {}", key);
                                throw x;
                        }
 
@@ -445,15 +466,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        return acl == null ? null : acl.serialize();
                }
 
-               private final String fName;
-               private final ConfigDb fConfigDb;
-               private final ConfigPath fBaseTopicData;
-               private final String fOwner;
-               private final String fDesc;
-               private final NsaAcl fReaders;
-               private final NsaAcl fWriters;
-               private boolean fTransactionEnabled;
-
                public boolean isTransactionEnabled() {
                        return fTransactionEnabled;
                }