From e6afd43dbc9d4f5979bbc9dc1309d826aa8cf58d Mon Sep 17 00:00:00 2001 From: efiacor Date: Tue, 22 Mar 2022 15:45:45 +0000 Subject: [PATCH] [DMAAP-MR] Get topics from kafka option Signed-off-by: efiacor Change-Id: I8e21d249517f67ef2cbfe5511178e38b357f3d29 Issue-ID: DMAAP-1727 --- pom.xml | 2 +- .../ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy | 22 +-- .../ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml | 109 ++++++++----- .../dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java | 176 +++++++++++---------- .../mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java | 15 +- version.properties | 4 +- 6 files changed, 178 insertions(+), 150 deletions(-) diff --git a/pom.xml b/pom.xml index e66feda..9a3f661 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ org.onap.dmaap.messagerouter.messageservice dmaapMR1 - 1.3.2-SNAPSHOT + 1.4.0-SNAPSHOT dmaap-messagerouter-messageservice Message Router - Restful interface built for kafka diff --git a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy index 0841a3a..e09c538 100644 --- a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy +++ b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy @@ -3,20 +3,20 @@ xmlns jaxrs: "http://cxf.apache.org/jaxrs" xmlns util: "http://www.springframework.org/schema/util" - echoService(org.onap.dmaap.JaxrsEchoService) - userService(org.onap.dmaap.JaxrsUserService) +// echoService(org.onap.dmaap.JaxrsEchoService) +// userService(org.onap.dmaap.JaxrsUserService) topicService(org.onap.dmaap.service.TopicRestService) eventService(org.onap.dmaap.service.EventsRestService) - adminService(org.onap.dmaap.service.AdminRestService) +// adminService(org.onap.dmaap.service.AdminRestService) apiKeyService(org.onap.dmaap.service.ApiKeysRestService) metricsService(org.onap.dmaap.service.MetricsRestService) - transactionService(org.onap.dmaap.service.TransactionRestService) - UIService(org.onap.dmaap.service.UIRestServices) - mirrorService(org.onap.dmaap.service.MMRestService) +// transactionService(org.onap.dmaap.service.TransactionRestService) +// UIService(org.onap.dmaap.service.UIRestServices) +// mirrorService(org.onap.dmaap.service.MMRestService) - util.list(id: 'jaxrsServices') { - ref(bean:'echoService') - ref(bean:'userService') - - } +// util.list(id: 'jaxrsServices') { +// ref(bean:'echoService') +// ref(bean:'userService') +// +// } } \ No newline at end of file diff --git a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml index 090a76b..20e4ceb 100644 --- a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml +++ b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml @@ -5,45 +5,68 @@ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> - + - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + - - - --> + + - - - - + + @@ -60,7 +83,7 @@ class="com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter" /> - + @@ -71,7 +94,7 @@ - + @@ -82,13 +105,13 @@ --> - - + + - + @@ -125,9 +148,9 @@ - - - - + + + + diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 7a08345..ae7414e 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -36,6 +36,7 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; import org.json.JSONArray; @@ -44,7 +45,6 @@ import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.constants.CambriaConstants; import org.onap.dmaap.dmf.mr.metabroker.Broker1; import org.onap.dmaap.dmf.mr.metabroker.Topic; -import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; import org.onap.dmaap.dmf.mr.utils.Utils; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.util.StringUtils; @@ -62,31 +62,35 @@ import java.util.concurrent.ExecutionException; //@Component public class DMaaPKafkaMetaBroker implements Broker1 { + private static final EELFLogger log = EELFManager.getLogger(DMaaPKafkaMetaBroker.class); + private final AdminClient fKafkaAdminClient; + private static final boolean GET_TOPICS_FROM_ZK = Boolean.parseBoolean(System.getenv(). + getOrDefault("useZkTopicStore", "true")); + private final ZkClient fZk; + private final ConfigDb fCambriaConfig; + private final ConfigPath fBaseTopicData; + private static final String ZK_TOPICS_ROOT = "/brokers/topics"; + private static final JSONObject kEmptyAcl = new JSONObject(); + public DMaaPKafkaMetaBroker() { fZk = null; fCambriaConfig = null; fBaseTopicData = null; final Properties props = new Properties (); String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "kafka.metadata.broker.list"); - if (StringUtils.isEmpty(fkafkaBrokers)) { + "kafka.metadata.broker.list"); + if (StringUtils.isEmpty(fkafkaBrokers)) { fkafkaBrokers = "localhost:9092"; } - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + if(Utils.isCadiEnabled()){ props.putAll(Utils.addSaslProps()); } fKafkaAdminClient=AdminClient.create ( props ); - } - private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); - private final AdminClient fKafkaAdminClient; - - - /** * DMaaPKafkaMetaBroker constructor initializing * @@ -95,50 +99,61 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @param configDb */ public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings, - @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { + @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); final Properties props = new Properties (); String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "kafka.metadata.broker.list"); - if (null == fkafkaBrokers) { + "kafka.metadata.broker.list"); + if (null == fkafkaBrokers) { fkafkaBrokers = "localhost:9092"; } + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); if(Utils.isCadiEnabled()){ props.putAll(Utils.addSaslProps()); } - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - fKafkaAdminClient=AdminClient.create ( props ); - - - } - public DMaaPKafkaMetaBroker( rrNvReadable settings, - ZkClient zk, ConfigDb configDb,AdminClient client) { - + public DMaaPKafkaMetaBroker(ZkClient zk, ConfigDb configDb,AdminClient client) { fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); fKafkaAdminClient= client; - - - } @Override public List getAllTopics() throws ConfigDbException { log.info("Retrieving list of all the topics."); - final LinkedList result = new LinkedList<>(); + if (!GET_TOPICS_FROM_ZK) { + return getTopicsFromKafka(); + } + return getTopicsFromZookeeper(); + } + + private LinkedList getTopicsFromKafka() throws ConfigDbException { + LinkedList res = new LinkedList<>(); + final ListTopicsResult ltr = fKafkaAdminClient.listTopics(); try { - log.info("Retrieving all topics from root: " + zkTopicsRoot); - final List topics = fZk.getChildren(zkTopicsRoot); + for (String name: ltr.names().get()) { + res.add(new KafkaTopic(name, fCambriaConfig, fBaseTopicData)); + } + } catch (InterruptedException | ExecutionException e) { + log.error("GetAllTopicsFromKafka: Failed to retrieve topic list from kafka.", e); + } + return res; + } + + private LinkedList getTopicsFromZookeeper() throws ConfigDbException { + final LinkedList legacyResult = new LinkedList<>(); + try { + log.info("Retrieving all topics from root: " + ZK_TOPICS_ROOT); + final List topics = fZk.getChildren(ZK_TOPICS_ROOT); for (String topic : topics) { - result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData)); + legacyResult.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData)); } JSONObject dataObj = new JSONObject(); dataObj.put("topics", new JSONObject()); @@ -148,17 +163,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } } catch (ZkNoNodeException excp) { // very fresh kafka doesn't have any topics or a topics node - log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp); + log.error("ZK doesn't have a Kakfa topics node at " + ZK_TOPICS_ROOT, excp); } - return result; + return legacyResult; } @Override public Topic getTopic(String topic) throws ConfigDbException { - if (fZk.exists(zkTopicsRoot + "/" + topic)) { + if (!GET_TOPICS_FROM_ZK) { + try { + for (String name : fKafkaAdminClient.listTopics().names().get()) { + if (name.equals(topic)) { + log.debug("TOPIC_NAME: {} is equal to : {}", name, topic); + return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic); + } + } + } catch (InterruptedException | ExecutionException e) { + log.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e); + return null; + } + } else if (fZk.exists(ZK_TOPICS_ROOT + "/" + topic)) { return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic); } - // else: no such topic in kafka + // else: no such topic return null; } @@ -180,42 +207,38 @@ public class DMaaPKafkaMetaBroker implements Broker1 { */ @Override public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas, - boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { - log.info("Creating topic: " + topic); + boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { + log.info("Creating topic: {}", topic); try { - log.info("Check if topic [" + topic + "] exist."); + log.info("Check if topic [{}] exist.", topic); // first check for existence "our way" final Topic t = getTopic(topic); if (t != null) { - log.info("Could not create topic [" + topic + "]. Topic Already exists."); + log.info("Could not create topic [{}]. Topic Already exists.", topic); throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists."); } } catch (ConfigDbException e1) { log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1); throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "Couldn't check topic data in config db."); + "Couldn't check topic data in config db."); } // we only allow 3 replicas. (If we don't test this, we get weird // results from the cluster, // so explicit test and fail.) if (replicas < 1 || replicas > 3) { - log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3."); + log.info("Topic [{}] could not be created. The replica count must be between 1 and 3.", topic); throw new CambriaApiException(HttpStatusCodes.k400_badRequest, - "The replica count must be between 1 and 3."); + "The replica count must be between 1 and 3."); } if (partitions < 1) { - log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1."); + log.info("Topic [{}] could not be created. The partition count must be at least 1.", topic); throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1."); } - // create via kafka - try { - final NewTopic topicRequest = - new NewTopic(topic, partitions, (short)replicas); - final CreateTopicsResult ctr = - fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); + final NewTopic topicRequest = new NewTopic(topic, partitions, (short)replicas); + final CreateTopicsResult ctr = fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); final KafkaFuture ctrResult = ctr.all(); ctrResult.get(); // underlying Kafka topic created. now setup our API info @@ -232,16 +255,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException { - log.info("Deleting topic: " + topic); + log.info("Deleting topic: {}", topic); try { log.info("Loading zookeeper client for topic deletion."); // topic creation. (Otherwise, the topic is only partially created // in ZK.) - - fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); throw new ConfigDbException(e); @@ -250,13 +270,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } } - private final ZkClient fZk; - private final ConfigDb fCambriaConfig; - private final ConfigPath fBaseTopicData; - - private static final String zkTopicsRoot = "/brokers/topics"; - private static final JSONObject kEmptyAcl = new JSONObject(); - /** * method Providing KafkaTopic Object associated with owner and * transactionenabled or not @@ -269,7 +282,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @throws ConfigDbException */ public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled) - throws ConfigDbException { + throws ConfigDbException { return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled); } @@ -286,18 +299,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @throws ConfigDbException */ public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner, - boolean transactionEnabled) throws ConfigDbException { + boolean transactionEnabled) throws ConfigDbException { final JSONObject o = new JSONObject(); o.put("owner", owner); o.put("description", desc); o.put("txenabled", transactionEnabled); - db.store(basePath.getChild(name), o.toString()); + if (GET_TOPICS_FROM_ZK) { + db.store(basePath.getChild(name), o.toString()); + } return new KafkaTopic(name, db, basePath); } /** - * class performing all user opearation like user is eligible to read, - * write. permitting a user to write and read, + * class performing all user operation like user is eligible to read, + * write. permitting a user to write and read etc * * @author anowarul.islam * @@ -311,6 +326,16 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @param baseTopic * @throws ConfigDbException */ + + private final String fName; + private final ConfigDb fConfigDb; + private final ConfigPath fBaseTopicData; + private final String fOwner; + private final String fDesc; + private final NsaAcl fReaders; + private final NsaAcl fWriters; + private final boolean fTransactionEnabled; + public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException { fName = name; fConfigDb = configdb; @@ -396,7 +421,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void permitWritesFromUser(String pubId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { + throws ConfigDbException, AccessDeniedException { updateAcl(asUser, false, true, pubId); } @@ -407,22 +432,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 { @Override public void permitReadsByUser(String consumerId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { + throws ConfigDbException, AccessDeniedException { updateAcl(asUser, true, true, consumerId); } @Override public void denyReadsByUser(String consumerId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { + throws ConfigDbException, AccessDeniedException { updateAcl(asUser, true, false, consumerId); } private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key) - throws ConfigDbException, AccessDeniedException{ - try - { + throws ConfigDbException, AccessDeniedException{ + try { final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add ); - // we have to assume we have current data, or load it again. for the expected use // case, assuming we can overwrite the data is fine. final JSONObject o = new JSONObject (); @@ -432,10 +455,8 @@ public class DMaaPKafkaMetaBroker implements Broker1 { fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () ); log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); - - } - catch ( ConfigDbException | AccessDeniedException x ) - { + } catch ( ConfigDbException | AccessDeniedException x ) { + log.info("Error when trying to update acl for key {}", key); throw x; } @@ -445,15 +466,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 { return acl == null ? null : acl.serialize(); } - private final String fName; - private final ConfigDb fConfigDb; - private final ConfigPath fBaseTopicData; - private final String fOwner; - private final String fDesc; - private final NsaAcl fReaders; - private final NsaAcl fWriters; - private boolean fTransactionEnabled; - public boolean isTransactionEnabled() { return fTransactionEnabled; } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java index d59c839..1c717ca 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import com.att.nsa.configs.ConfigDb; import com.att.nsa.configs.ConfigDbException; import com.att.nsa.configs.ConfigPath; +import java.util.Properties; +import org.mockito.ArgumentMatchers; import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.admin.AdminClient; import org.junit.Before; @@ -47,15 +49,13 @@ import org.powermock.modules.junit4.PowerMockRunner; @PrepareForTest({ AdminClient.class }) public class DMaaPKafkaMetaBrokerTest { - @InjectMocks + @Mock private DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker; @Mock private ZkClient fZk; @Mock private AdminClient fKafkaAdminClient; @Mock - private AdminClient client; - @Mock private ConfigDb configDb; @Mock ConfigPath fBaseTopicData; @@ -68,18 +68,13 @@ public class DMaaPKafkaMetaBrokerTest { public void setUp() { MockitoAnnotations.initMocks(this); PowerMockito.mockStatic(AdminClient.class); - // PowerMockito.when(AdminClient.create (any(Properties.class) - // )).thenReturn(fKafkaAdminClient); - - // PowerMockito.mockStatic(AdminUtils.class); + PowerMockito.when(AdminClient.create(ArgumentMatchers.any(Properties.class))).thenReturn(fKafkaAdminClient); PowerMockito.when(configDb.parse("/topics")).thenReturn(fBaseTopicData); - } @Test public void testBrokercreate() { DMaaPKafkaMetaBroker broker = new DMaaPKafkaMetaBroker(); - } @Test @@ -90,7 +85,6 @@ public class DMaaPKafkaMetaBrokerTest { } catch (Exception e) { assertTrue(true); } - } @Test @@ -101,7 +95,6 @@ public class DMaaPKafkaMetaBrokerTest { // TODO Auto-generated catch block e.printStackTrace(); } - } @Test diff --git a/version.properties b/version.properties index 67a9369..4b00b94 100644 --- a/version.properties +++ b/version.properties @@ -26,8 +26,8 @@ # because they are used in Jenkins, whose plug-in doesn't support major=1 -minor=3 -patch=2 +minor=4 +patch=0 base_version=${major}.${minor}.${patch} -- 2.16.6