X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fbeans%2FDMaaPKafkaMetaBroker.java;fp=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fbeans%2FDMaaPKafkaMetaBroker.java;h=ae7414ec8ef72ffe377c1b98e2a8a2943262a673;hb=e6afd43dbc9d4f5979bbc9dc1309d826aa8cf58d;hp=7a08345f88f40a629bd7809e7a1df6908e44ce04;hpb=c5a65e41585490b9fa2b2427ecee15a2b66f11f6;p=dmaap%2Fmessagerouter%2Fmessageservice.git diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 7a08345..ae7414e 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -36,6 +36,7 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; import org.json.JSONArray; @@ -44,7 +45,6 @@ import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.metabroker.Broker1; import org.onap.dmaap.dmf.mr.metabroker.Topic; -import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; import org.onap.dmaap.dmf.mr.utils.Utils; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.util.StringUtils; @@ -62,31 +62,35 @@ import java.util.concurrent.ExecutionException; //@Component public class DMaaPKafkaMetaBroker implements Broker1 { + private static final EELFLogger log = EELFManager.getLogger(DMaaPKafkaMetaBroker.class); + private final AdminClient fKafkaAdminClient; + private static final boolean GET_TOPICS_FROM_ZK = Boolean.parseBoolean(System.getenv(). + getOrDefault("useZkTopicStore", "true")); + private final ZkClient fZk; + private final ConfigDb fCambriaConfig; + private final ConfigPath fBaseTopicData; + private static final String ZK_TOPICS_ROOT = "/brokers/topics"; + private static final JSONObject kEmptyAcl = new JSONObject(); + public DMaaPKafkaMetaBroker() { fZk = null; fCambriaConfig = null; fBaseTopicData = null; final Properties props = new Properties (); String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "kafka.metadata.broker.list"); - if (StringUtils.isEmpty(fkafkaBrokers)) { + "kafka.metadata.broker.list"); + if (StringUtils.isEmpty(fkafkaBrokers)) { fkafkaBrokers = "localhost:9092"; } - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + if(Utils.isCadiEnabled()){ props.putAll(Utils.addSaslProps()); } fKafkaAdminClient=AdminClient.create ( props ); - } - private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); - private final AdminClient fKafkaAdminClient; - - - /** * DMaaPKafkaMetaBroker constructor initializing * @@ -95,50 +99,61 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @param configDb */ public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings, - @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { + @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); final Properties props = new Properties (); String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "kafka.metadata.broker.list"); - if (null == fkafkaBrokers) { + "kafka.metadata.broker.list"); + if (null == fkafkaBrokers) { fkafkaBrokers = "localhost:9092"; } + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); if(Utils.isCadiEnabled()){ props.putAll(Utils.addSaslProps()); } - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - fKafkaAdminClient=AdminClient.create ( props ); - - - } - public DMaaPKafkaMetaBroker( rrNvReadable settings, - ZkClient zk, ConfigDb configDb,AdminClient client) { - + public DMaaPKafkaMetaBroker(ZkClient zk, ConfigDb configDb,AdminClient client) { fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); fKafkaAdminClient= client; - - - } @Override public List getAllTopics() throws ConfigDbException { log.info("Retrieving list of all the topics."); - final LinkedList result = new LinkedList<>(); + if (!GET_TOPICS_FROM_ZK) { + return getTopicsFromKafka(); + } + return getTopicsFromZookeeper(); + } + + private LinkedList getTopicsFromKafka() throws ConfigDbException { + LinkedList res = new LinkedList<>(); + final ListTopicsResult ltr = fKafkaAdminClient.listTopics(); try { - log.info("Retrieving all topics from root: " + zkTopicsRoot); - final List topics = fZk.getChildren(zkTopicsRoot); + for (String name: ltr.names().get()) { + res.add(new KafkaTopic(name, fCambriaConfig, fBaseTopicData)); + } + } catch (InterruptedException | ExecutionException e) { + log.error("GetAllTopicsFromKafka: Failed to retrieve topic list from kafka.", e); + } + return res; + } + + private LinkedList getTopicsFromZookeeper() throws ConfigDbException { + final LinkedList legacyResult = new LinkedList<>(); + try { + log.info("Retrieving all topics from root: " + ZK_TOPICS_ROOT); + final List topics = fZk.getChildren(ZK_TOPICS_ROOT); for (String topic : topics) { - result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData)); + legacyResult.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData)); } JSONObject dataObj = new JSONObject(); dataObj.put("topics", new JSONObject()); @@ -148,17 +163,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } } catch (ZkNoNodeException excp) { // very fresh kafka doesn't have any topics or a topics node - log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp); + log.error("ZK doesn't have a Kakfa topics node at " + ZK_TOPICS_ROOT, excp); } - return result; + return legacyResult; } @Override public Topic getTopic(String topic) throws ConfigDbException { - if (fZk.exists(zkTopicsRoot + "/" + topic)) { + if (!GET_TOPICS_FROM_ZK) { + try { + for (String name : fKafkaAdminClient.listTopics().names().get()) { + if (name.equals(topic)) { + log.debug("TOPIC_NAME: {} is equal to : {}", name, topic); + return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic); + } + } + } catch (InterruptedException | ExecutionException e) { + log.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e); + return null; + } + } else if (fZk.exists(ZK_TOPICS_ROOT + "/" + topic)) { return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic); } - // else: no such topic in kafka + // else: no such topic return null; } @@ -180,42 +207,38 @@ public class DMaaPKafkaMetaBroker implements Broker1 { */ @Override public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas, - boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { - log.info("Creating topic: " + topic); + boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { + log.info("Creating topic: {}", topic); try { - log.info("Check if topic [" + topic + "] exist."); + log.info("Check if topic [{}] exist.", topic); // first check for existence "our way" final Topic t = getTopic(topic); if (t != null) { - log.info("Could not create topic [" + topic + "]. Topic Already exists."); + log.info("Could not create topic [{}]. Topic Already exists.", topic); throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists."); } } catch (ConfigDbException e1) { log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1); throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "Couldn't check topic data in config db."); + "Couldn't check topic data in config db."); } // we only allow 3 replicas. (If we don't test this, we get weird // results from the cluster, // so explicit test and fail.) if (replicas < 1 || replicas > 3) { - log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3."); + log.info("Topic [{}] could not be created. The replica count must be between 1 and 3.", topic); throw new CambriaApiException(HttpStatusCodes.k400_badRequest, - "The replica count must be between 1 and 3."); + "The replica count must be between 1 and 3."); } if (partitions < 1) { - log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1."); + log.info("Topic [{}] could not be created. The partition count must be at least 1.", topic); throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1."); } - // create via kafka - try { - final NewTopic topicRequest = - new NewTopic(topic, partitions, (short)replicas); - final CreateTopicsResult ctr = - fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); + final NewTopic topicRequest = new NewTopic(topic, partitions, (short)replicas); + final CreateTopicsResult ctr = fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); final KafkaFuture ctrResult = ctr.all(); ctrResult.get(); // underlying Kafka topic created. now setup our API info @@ -232,16 +255,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException { - log.info("Deleting topic: " + topic); + log.info("Deleting topic: {}", topic); try { log.info("Loading zookeeper client for topic deletion."); // topic creation. (Otherwise, the topic is only partially created // in ZK.) - - fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); throw new ConfigDbException(e); @@ -250,13 +270,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } } - private final ZkClient fZk; - private final ConfigDb fCambriaConfig; - private final ConfigPath fBaseTopicData; - - private static final String zkTopicsRoot = "/brokers/topics"; - private static final JSONObject kEmptyAcl = new JSONObject(); - /** * method Providing KafkaTopic Object associated with owner and * transactionenabled or not @@ -269,7 +282,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @throws ConfigDbException */ public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled) - throws ConfigDbException { + throws ConfigDbException { return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled); } @@ -286,18 +299,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @throws ConfigDbException */ public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner, - boolean transactionEnabled) throws ConfigDbException { + boolean transactionEnabled) throws ConfigDbException { final JSONObject o = new JSONObject(); o.put("owner", owner); o.put("description", desc); o.put("txenabled", transactionEnabled); - db.store(basePath.getChild(name), o.toString()); + if (GET_TOPICS_FROM_ZK) { + db.store(basePath.getChild(name), o.toString()); + } return new KafkaTopic(name, db, basePath); } /** - * class performing all user opearation like user is eligible to read, - * write. permitting a user to write and read, + * class performing all user operation like user is eligible to read, + * write. permitting a user to write and read etc * * @author anowarul.islam * @@ -311,6 +326,16 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @param baseTopic * @throws ConfigDbException */ + + private final String fName; + private final ConfigDb fConfigDb; + private final ConfigPath fBaseTopicData; + private final String fOwner; + private final String fDesc; + private final NsaAcl fReaders; + private final NsaAcl fWriters; + private final boolean fTransactionEnabled; + public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException { fName = name; fConfigDb = configdb; @@ -396,7 +421,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void permitWritesFromUser(String pubId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { + throws ConfigDbException, AccessDeniedException { updateAcl(asUser, false, true, pubId); } @@ -407,22 +432,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void permitReadsByUser(String consumerId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { + throws ConfigDbException, AccessDeniedException { updateAcl(asUser, true, true, consumerId); } @Override public void denyReadsByUser(String consumerId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { + throws ConfigDbException, AccessDeniedException { updateAcl(asUser, true, false, consumerId); } private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key) - throws ConfigDbException, AccessDeniedException{ - try - { + throws ConfigDbException, AccessDeniedException{ + try { final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add ); - // we have to assume we have current data, or load it again. for the expected use // case, assuming we can overwrite the data is fine. final JSONObject o = new JSONObject (); @@ -432,10 +455,8 @@ public class DMaaPKafkaMetaBroker implements Broker1 { fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () ); log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); - - } - catch ( ConfigDbException | AccessDeniedException x ) - { + } catch ( ConfigDbException | AccessDeniedException x ) { + log.info("Error when trying to update acl for key {}", key); throw x; } @@ -445,15 +466,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { return acl == null ? null : acl.serialize(); } - private final String fName; - private final ConfigDb fConfigDb; - private final ConfigPath fBaseTopicData; - private final String fOwner; - private final String fDesc; - private final NsaAcl fReaders; - private final NsaAcl fWriters; - private boolean fTransactionEnabled; - public boolean isTransactionEnabled() { return fTransactionEnabled; }