[DMAAP-MR] Get topics from kafka option 75/127975/4 1.4.0
authorefiacor <fiachra.corcoran@est.tech>
Tue, 22 Mar 2022 15:45:45 +0000 (15:45 +0000)
committerefiacor <fiachra.corcoran@est.tech>
Wed, 23 Mar 2022 12:40:33 +0000 (12:40 +0000)
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: I8e21d249517f67ef2cbfe5511178e38b357f3d29
Issue-ID: DMAAP-1727

pom.xml
src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy
src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml
src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java
version.properties

diff --git a/pom.xml b/pom.xml
index e66feda..9a3f661 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
 
        <groupId>org.onap.dmaap.messagerouter.messageservice</groupId>
        <artifactId>dmaapMR1</artifactId>
-       <version>1.3.2-SNAPSHOT</version>
+       <version>1.4.0-SNAPSHOT</version>
        <name>dmaap-messagerouter-messageservice</name>
        <description>Message Router - Restful interface built for kafka</description>
        <licenses>
index 0841a3a..e09c538 100644 (file)
@@ -3,20 +3,20 @@
        xmlns jaxrs: "http://cxf.apache.org/jaxrs"
        xmlns util: "http://www.springframework.org/schema/util"
        
-       echoService(org.onap.dmaap.JaxrsEchoService)
-       userService(org.onap.dmaap.JaxrsUserService)
+//     echoService(org.onap.dmaap.JaxrsEchoService)
+//     userService(org.onap.dmaap.JaxrsUserService)
        topicService(org.onap.dmaap.service.TopicRestService)
        eventService(org.onap.dmaap.service.EventsRestService)
-       adminService(org.onap.dmaap.service.AdminRestService)
+//     adminService(org.onap.dmaap.service.AdminRestService)
        apiKeyService(org.onap.dmaap.service.ApiKeysRestService)
        metricsService(org.onap.dmaap.service.MetricsRestService)
-       transactionService(org.onap.dmaap.service.TransactionRestService)
-       UIService(org.onap.dmaap.service.UIRestServices)
-       mirrorService(org.onap.dmaap.service.MMRestService)
+//     transactionService(org.onap.dmaap.service.TransactionRestService)
+//     UIService(org.onap.dmaap.service.UIRestServices)
+//     mirrorService(org.onap.dmaap.service.MMRestService)
        
-       util.list(id: 'jaxrsServices') {
-               ref(bean:'echoService')
-               ref(bean:'userService')
-               
-       }
+//     util.list(id: 'jaxrsServices') {
+//             ref(bean:'echoService')
+//             ref(bean:'userService')
+//
+//     }
 }
\ No newline at end of file
index 090a76b..20e4ceb 100644 (file)
@@ -5,45 +5,68 @@
               http://www.springframework.org/schema/beans
               http://www.springframework.org/schema/beans/spring-beans.xsd
               http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
-       
+
        <!-- Dependency Injection with annotations -->
        <!-- <context:property-placeholder
                location="file:/C:/Users/su622b/Desktop/testonap.properties"/> -->
-               <!-- <context:property-placeholder
-               location="classpath:msgRtrApi.properties,classpath:DMaaPErrorMesaages.properties" /> -->
-               
-               <context:component-scan
-               base-package="org.onap.dmaap,org.onap.dmaap.filemonitor,org.onap.dmaap.mmagent,org.onap.dmaap.service,org.onap.dmaap.tools,org.onap.dmaap.util,org.onap.dmaap.filter,org.onap.dmaap.apiServer.metrics.cambria,
-       org.onap.dmaap.dmf.mr,org.onap.dmaap.dmf.mr.backends,org.onap.dmaap.dmf.mr.backends.kafka,org.onap.dmaap.dmf.mr.backends.memory,org.onap.dmaap.dmf.mr.beans,org.onap.dmaap.dmf.mr.constants,org.onap.dmaap.dmf.mr.exception,
-       org.onap.dmaap.dmf.mr.listener,org.onap.dmaap.dmf.mr.metabroker,org.onap.dmaap.dmf.mr.metrics.publisher,org.onap.dmaap.dmf.mr.metrics.publisher.impl,org.onap.dmaap.dmf.mr.resources,org.onap.dmaap.dmf.mr.resources.streamReaders,org.onap.dmaap.dmf.mr.security,
-       org.onap.dmaap.dmf.mr.security.impl,org.onap.dmaap.dmf.mr.service,org.onap.dmaap.dmf.mr.service.impl,org.onap.dmaap.dmf.mr.transaction,org.onap.dmaap.dmf.mr.transaction.impl,org.onap.dmaap.dmf.mr.utils,
-       com.att,com.att.dmf.mr.utils, com.att.dmf.mr, com.att.dmf.mr.rest,com.att.dmf.mr.service,
-       com.att.dmf.mr.service.impl,com.att.dmf.mr.beans,com.att.dmf.mr.security,com.att.dmf.mr.exception,com.att.dmf.mr.backends,com.att.dmf.mr.backends.kafka,
-       com.att.dmf.mr.transaction,com.att.dmf.mr.exception,com.att.nsa.dmaap,com.att.nsa.dmaap.service,com.att.nsa.dmaap.util,java.lang,java.util,com.att.dmf.mr.exception, com.att.dmf,com.att.nsa.dmaap.mmagent" />
-               <context:property-placeholder 
-               location="file:${AJSC_HOME}/bundleconfig/etc/appprops/MsgRtrApi.properties,file:${AJSC_HOME}/etc/DMaaPErrorMesaages.properties"/> 
-       
-               <bean id="jsonProvider" class="org.apache.cxf.jaxrs.provider.json.JSONProvider">
-                        <property name="dropRootElement" value="true" />
-               <property name="supportUnwrapped" value="true" />
-        </bean>
-               
-               <bean id="jacksonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider" />
-               
-               <bean id="cambriaExMapper" class="org.onap.dmaap.DMaaPCambriaExceptionMapper" />
-    
-       <bean id="webExMapper" class="org.onap.dmaap.DMaaPWebExceptionMapper" />
-    
-               
+       <!-- <context:property-placeholder
+  location="classpath:msgRtrApi.properties,classpath:DMaaPErrorMesaages.properties" /> -->
+
+       <context:component-scan
+               base-package="
+               org.onap.dmaap,org.onap.dmaap.filemonitor,
+               org.onap.dmaap.mmagent,
+               org.onap.dmaap.service,
+               org.onap.dmaap.tools,
+               org.onap.dmaap.util,
+               org.onap.dmaap.filter,
+               org.onap.dmaap.apiServer.metrics.cambria,
+               org.onap.dmaap.dmf.mr,
+               org.onap.dmaap.dmf.mr.backends,
+               org.onap.dmaap.dmf.mr.backends.kafka,
+               org.onap.dmaap.dmf.mr.backends.memory,
+               org.onap.dmaap.dmf.mr.beans,
+               org.onap.dmaap.dmf.mr.constants,
+               org.onap.dmaap.dmf.mr.exception,
+               org.onap.dmaap.dmf.mr.listener,
+               org.onap.dmaap.dmf.mr.metabroker,
+               org.onap.dmaap.dmf.mr.metrics.publisher,
+               org.onap.dmaap.dmf.mr.metrics.publisher.impl,
+               org.onap.dmaap.dmf.mr.resources,
+               org.onap.dmaap.dmf.mr.resources.streamReaders,
+               org.onap.dmaap.dmf.mr.security,
+               org.onap.dmaap.dmf.mr.security.impl,
+               org.onap.dmaap.dmf.mr.service,
+               org.onap.dmaap.dmf.mr.service.impl,
+               org.onap.dmaap.dmf.mr.transaction,
+               org.onap.dmaap.dmf.mr.transaction.impl,
+               org.onap.dmaap.dmf.mr.utils,
+               java.lang,
+               java.util" />
+       <context:property-placeholder
+               location="file:${AJSC_HOME}/bundleconfig/etc/appprops/MsgRtrApi.properties,file:${AJSC_HOME}/etc/DMaaPErrorMesaages.properties"/>
+
+       <bean id="jsonProvider" class="org.apache.cxf.jaxrs.provider.json.JSONProvider">
+               <property name="dropRootElement" value="true" />
+               <property name="supportUnwrapped" value="true" />
+       </bean>
+
+       <bean id="jacksonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider" />
+
+       <bean id="cambriaExMapper" class="org.onap.dmaap.DMaaPCambriaExceptionMapper" />
+
+       <bean id="webExMapper" class="org.onap.dmaap.DMaaPWebExceptionMapper" />
+
+
        <!-- Your bean definitions goes here -->
-<!--   <bean id="performanceLog" name="performanceLog" class="com.att.ajsc.csi.logging.PerformanceTracking" /> -->
-<!--   <bean id="processRestletHeaders" name="processRestletHeaders" class="ajsc.restlet.ProcessRestletHeaders" /> -->
-               <bean id="servicePropsBean" name="servicePropsBean" 
+       <!--    <bean id="performanceLog" name="performanceLog" class="com.att.ajsc.csi.logging.PerformanceTracking" /> -->
+       <!--    <bean id="processRestletHeaders" name="processRestletHeaders" class="ajsc.restlet.ProcessRestletHeaders" /> -->
+       <bean id="servicePropsBean" name="servicePropsBean"
                class="org.onap.dmaap.util.ServicePropertiesMapBean" />
-               
-               <!-- Msgrtr beans -->
-               <bean id="propertyReader" class="org.onap.dmaap.dmf.mr.utils.PropertyReader" />
-               <bean
+
+       <!-- Msgrtr beans -->
+       <bean id="propertyReader" class="org.onap.dmaap.dmf.mr.utils.PropertyReader" />
+       <bean
                class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
                <!-- Next value is the full qualified name of the static setter including 
                        method name -->
@@ -60,7 +83,7 @@
                class="com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter" />
 
        <bean id="dMaaPMetricsSet" class="org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet">
-                <constructor-arg ref="propertyReader" /> 
+               <constructor-arg ref="propertyReader" />
        </bean>
 
        <bean id="dMaaPZkClient" class=" org.onap.dmaap.dmf.mr.beans.DMaaPZkClient">
@@ -71,7 +94,7 @@
                <constructor-arg ref="dMaaPZkClient" />
                <constructor-arg ref="propertyReader" />
        </bean>
-       
+
 
        <bean id="kafkaPublisher" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher">
                <constructor-arg ref="propertyReader" />
                <constructor-arg ref="dMaaPMetricsSet" />
                <constructor-arg ref="kafkalockavoid" />
        </bean> -->
-       
-               <bean id="dMaaPKafkaConsumerFactory" class="org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory">
+
+       <bean id="dMaaPKafkaConsumerFactory" class="org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory">
                <constructor-arg ref="dMaaPMetricsSet" />
                <constructor-arg ref="curator" />
                <constructor-arg ref="kafkalockavoid" />
        </bean>
-       
+
 
        <bean id="curator" class="org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory"
                factory-method="getCurator">
        <bean id="defLength" class="org.onap.dmaap.mr.filter.DefaultLength">
                <property name="defaultLength" value="${maxcontentlength}"></property>
        </bean>
-       
-        <bean id="kafkalockavoid" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2" /> 
-       
 
-               <bean class="org.springframework.context.annotation.CommonAnnotationBeanPostProcessor"/>
+       <bean id="kafkalockavoid" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2" />
+
+
+       <bean class="org.springframework.context.annotation.CommonAnnotationBeanPostProcessor"/>
 </beans>               
index 7a08345..ae7414e 100644 (file)
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.KafkaFuture;
 import org.json.JSONArray;
@@ -44,7 +45,6 @@ import org.onap.dmaap.dmf.mr.CambriaApiException;
 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
 import org.onap.dmaap.dmf.mr.metabroker.Topic;
-import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
 import org.onap.dmaap.dmf.mr.utils.Utils;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.util.StringUtils;
@@ -62,31 +62,35 @@ import java.util.concurrent.ExecutionException;
 //@Component
 public class DMaaPKafkaMetaBroker implements Broker1 {
 
+       private static final EELFLogger log = EELFManager.getLogger(DMaaPKafkaMetaBroker.class);
+       private final AdminClient fKafkaAdminClient;
+       private static final boolean GET_TOPICS_FROM_ZK = Boolean.parseBoolean(System.getenv().
+               getOrDefault("useZkTopicStore", "true"));
+       private final ZkClient fZk;
+       private final ConfigDb fCambriaConfig;
+       private final ConfigPath fBaseTopicData;
+       private static final String ZK_TOPICS_ROOT = "/brokers/topics";
+       private static final JSONObject kEmptyAcl = new JSONObject();
+
        public DMaaPKafkaMetaBroker() {
                fZk = null;
                fCambriaConfig = null;
                fBaseTopicData = null;
                final Properties props = new Properties ();
                String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "kafka.metadata.broker.list");
-               if (StringUtils.isEmpty(fkafkaBrokers)) {
+                       "kafka.metadata.broker.list");
 
+               if (StringUtils.isEmpty(fkafkaBrokers)) {
                        fkafkaBrokers = "localhost:9092";
                }
-
                props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+
                if(Utils.isCadiEnabled()){
                        props.putAll(Utils.addSaslProps());
                }
                fKafkaAdminClient=AdminClient.create ( props );
-
        }
 
-       private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
-       private final AdminClient fKafkaAdminClient;
-
-
-
        /**
         * DMaaPKafkaMetaBroker constructor initializing
         *
@@ -95,50 +99,61 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         * @param configDb
         */
        public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
-                                                               @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+               @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
                final Properties props = new Properties ();
                String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "kafka.metadata.broker.list");
-               if (null == fkafkaBrokers) {
+                       "kafka.metadata.broker.list");
 
+               if (null == fkafkaBrokers) {
                        fkafkaBrokers = "localhost:9092";
                }
+               props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
 
                if(Utils.isCadiEnabled()){
                        props.putAll(Utils.addSaslProps());
                }
-               props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-
                fKafkaAdminClient=AdminClient.create ( props );
-
-
-
        }
 
-       public DMaaPKafkaMetaBroker( rrNvReadable settings,
-                                                                ZkClient zk,  ConfigDb configDb,AdminClient client) {
-
+       public DMaaPKafkaMetaBroker(ZkClient zk,  ConfigDb configDb,AdminClient client) {
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
                fKafkaAdminClient= client;
-
-
-
        }
 
        @Override
        public List<Topic> getAllTopics() throws ConfigDbException {
                log.info("Retrieving list of all the topics.");
-               final LinkedList<Topic> result = new LinkedList<>();
+               if (!GET_TOPICS_FROM_ZK) {
+                       return getTopicsFromKafka();
+               }
+               return getTopicsFromZookeeper();
+       }
+
+       private LinkedList<Topic> getTopicsFromKafka() throws ConfigDbException {
+               LinkedList<Topic> res = new LinkedList<>();
+               final ListTopicsResult ltr = fKafkaAdminClient.listTopics();
                try {
-                       log.info("Retrieving all topics from root: " + zkTopicsRoot);
-                       final List<String> topics = fZk.getChildren(zkTopicsRoot);
+                       for (String name: ltr.names().get()) {
+                               res.add(new KafkaTopic(name, fCambriaConfig, fBaseTopicData));
+                       }
+               } catch (InterruptedException | ExecutionException e) {
+                       log.error("GetAllTopicsFromKafka: Failed to retrieve topic list from kafka.", e);
+               }
+               return res;
+       }
+
+       private LinkedList<Topic> getTopicsFromZookeeper() throws ConfigDbException {
+               final LinkedList<Topic> legacyResult = new LinkedList<>();
+               try {
+                       log.info("Retrieving all topics from root: " + ZK_TOPICS_ROOT);
+                       final List<String> topics = fZk.getChildren(ZK_TOPICS_ROOT);
                        for (String topic : topics) {
-                               result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
+                               legacyResult.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
                        }
                        JSONObject dataObj = new JSONObject();
                        dataObj.put("topics", new JSONObject());
@@ -148,17 +163,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        }
                } catch (ZkNoNodeException excp) {
                        // very fresh kafka doesn't have any topics or a topics node
-                       log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
+                       log.error("ZK doesn't have a Kakfa topics node at " + ZK_TOPICS_ROOT, excp);
                }
-               return result;
+               return legacyResult;
        }
 
        @Override
        public Topic getTopic(String topic) throws ConfigDbException {
-               if (fZk.exists(zkTopicsRoot + "/" + topic)) {
+               if (!GET_TOPICS_FROM_ZK) {
+                       try {
+                               for (String name : fKafkaAdminClient.listTopics().names().get()) {
+                                       if (name.equals(topic)) {
+                                               log.debug("TOPIC_NAME: {} is equal to : {}", name, topic);
+                                               return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
+                                       }
+                               }
+                       } catch (InterruptedException | ExecutionException e) {
+                               log.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e);
+                               return null;
+                       }
+               } else if (fZk.exists(ZK_TOPICS_ROOT + "/" + topic)) {
                        return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
                }
-               // else: no such topic in kafka
+               // else: no such topic
                return null;
        }
 
@@ -180,42 +207,38 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         */
        @Override
        public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
-                                                        boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
-               log.info("Creating topic: " + topic);
+               boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+               log.info("Creating topic: {}", topic);
                try {
-                       log.info("Check if topic [" + topic + "] exist.");
+                       log.info("Check if topic [{}] exist.", topic);
                        // first check for existence "our way"
                        final Topic t = getTopic(topic);
                        if (t != null) {
-                               log.info("Could not create topic [" + topic + "]. Topic Already exists.");
+                               log.info("Could not create topic [{}]. Topic Already exists.", topic);
                                throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
                        }
                } catch (ConfigDbException e1) {
                        log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
                        throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
-                                       "Couldn't check topic data in config db.");
+                               "Couldn't check topic data in config db.");
                }
 
                // we only allow 3 replicas. (If we don't test this, we get weird
                // results from the cluster,
                // so explicit test and fail.)
                if (replicas < 1 || replicas > 3) {
-                       log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
+                       log.info("Topic [{}] could not be created. The replica count must be between 1 and 3.", topic);
                        throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
-                                       "The replica count must be between 1 and 3.");
+                               "The replica count must be between 1 and 3.");
                }
                if (partitions < 1) {
-                       log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
+                       log.info("Topic [{}] could not be created. The partition count must be at least 1.", topic);
                        throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
                }
-
                // create via kafka
-
                try {
-                       final NewTopic topicRequest =
-                                       new NewTopic(topic, partitions, (short)replicas);
-                       final CreateTopicsResult ctr =
-                                       fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+                       final NewTopic topicRequest = new NewTopic(topic, partitions, (short)replicas);
+                       final CreateTopicsResult ctr = fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
                        final KafkaFuture<Void> ctrResult = ctr.all();
                        ctrResult.get();
                        // underlying Kafka topic created. now setup our API info
@@ -232,16 +255,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
        @Override
        public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
-               log.info("Deleting topic: " + topic);
+               log.info("Deleting topic: {}", topic);
                try {
                        log.info("Loading zookeeper client for topic deletion.");
                        // topic creation. (Otherwise, the topic is only partially created
                        // in ZK.)
-
-
                        fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
                        log.info("Zookeeper client loaded successfully. Deleting topic.");
-
                } catch (Exception e) {
                        log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
                        throw new ConfigDbException(e);
@@ -250,13 +270,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                }
        }
 
-       private final ZkClient fZk;
-       private final ConfigDb fCambriaConfig;
-       private final ConfigPath fBaseTopicData;
-
-       private static final String zkTopicsRoot = "/brokers/topics";
-       private static final JSONObject kEmptyAcl = new JSONObject();
-
        /**
         * method Providing KafkaTopic Object associated with owner and
         * transactionenabled or not
@@ -269,7 +282,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         * @throws ConfigDbException
         */
        public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
-                       throws ConfigDbException {
+               throws ConfigDbException {
                return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
        }
 
@@ -286,18 +299,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
         * @throws ConfigDbException
         */
        public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
-                                                                                         boolean transactionEnabled) throws ConfigDbException {
+               boolean transactionEnabled) throws ConfigDbException {
                final JSONObject o = new JSONObject();
                o.put("owner", owner);
                o.put("description", desc);
                o.put("txenabled", transactionEnabled);
-               db.store(basePath.getChild(name), o.toString());
+               if (GET_TOPICS_FROM_ZK) {
+                       db.store(basePath.getChild(name), o.toString());
+               }
                return new KafkaTopic(name, db, basePath);
        }
 
        /**
-        * class performing all user opearation like user is eligible to read,
-        * write. permitting a user to write and read,
+        * class performing all user operation like user is eligible to read,
+        * write. permitting a user to write and read etc
         *
         * @author anowarul.islam
         *
@@ -311,6 +326,16 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                 * @param baseTopic
                 * @throws ConfigDbException
                 */
+
+               private final String fName;
+               private final ConfigDb fConfigDb;
+               private final ConfigPath fBaseTopicData;
+               private final String fOwner;
+               private final String fDesc;
+               private final NsaAcl fReaders;
+               private final NsaAcl fWriters;
+               private final boolean fTransactionEnabled;
+
                public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
                        fName = name;
                        fConfigDb = configdb;
@@ -396,7 +421,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                @Override
                public void permitWritesFromUser(String pubId, NsaApiKey asUser)
-                               throws ConfigDbException, AccessDeniedException {
+                       throws ConfigDbException, AccessDeniedException {
                        updateAcl(asUser, false, true, pubId);
                }
 
@@ -407,22 +432,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
 
                @Override
                public void permitReadsByUser(String consumerId, NsaApiKey asUser)
-                               throws ConfigDbException, AccessDeniedException {
+                       throws ConfigDbException, AccessDeniedException {
                        updateAcl(asUser, true, true, consumerId);
                }
 
                @Override
                public void denyReadsByUser(String consumerId, NsaApiKey asUser)
-                               throws ConfigDbException, AccessDeniedException {
+                       throws ConfigDbException, AccessDeniedException {
                        updateAcl(asUser, true, false, consumerId);
                }
 
                private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
-                               throws ConfigDbException, AccessDeniedException{
-                       try
-                       {
+                       throws ConfigDbException, AccessDeniedException{
+                       try {
                                final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-
                                // we have to assume we have current data, or load it again. for the expected use
                                // case, assuming we can overwrite the data is fine.
                                final JSONObject o = new JSONObject ();
@@ -432,10 +455,8 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                                fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
 
                                log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-
-                       }
-                       catch ( ConfigDbException | AccessDeniedException x )
-                       {
+                       } catch ( ConfigDbException | AccessDeniedException x ) {
+                               log.info("Error when trying to update acl for key {}", key);
                                throw x;
                        }
 
@@ -445,15 +466,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        return acl == null ? null : acl.serialize();
                }
 
-               private final String fName;
-               private final ConfigDb fConfigDb;
-               private final ConfigPath fBaseTopicData;
-               private final String fOwner;
-               private final String fDesc;
-               private final NsaAcl fReaders;
-               private final NsaAcl fWriters;
-               private boolean fTransactionEnabled;
-
                public boolean isTransactionEnabled() {
                        return fTransactionEnabled;
                }
index d59c839..1c717ca 100644 (file)
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
 import com.att.nsa.configs.ConfigDb;
 import com.att.nsa.configs.ConfigDbException;
 import com.att.nsa.configs.ConfigPath;
+import java.util.Properties;
+import org.mockito.ArgumentMatchers;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.junit.Before;
@@ -47,15 +49,13 @@ import org.powermock.modules.junit4.PowerMockRunner;
 @PrepareForTest({ AdminClient.class })
 public class DMaaPKafkaMetaBrokerTest {
 
-       @InjectMocks
+       @Mock
        private DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker;
        @Mock
        private ZkClient fZk;
        @Mock
        private AdminClient fKafkaAdminClient;
        @Mock
-       private AdminClient client;
-       @Mock
        private ConfigDb configDb;
        @Mock
        ConfigPath fBaseTopicData;
@@ -68,18 +68,13 @@ public class DMaaPKafkaMetaBrokerTest {
        public void setUp() {
                MockitoAnnotations.initMocks(this);
                PowerMockito.mockStatic(AdminClient.class);
-               // PowerMockito.when(AdminClient.create (any(Properties.class)
-               // )).thenReturn(fKafkaAdminClient);
-
-               // PowerMockito.mockStatic(AdminUtils.class);
+               PowerMockito.when(AdminClient.create(ArgumentMatchers.any(Properties.class))).thenReturn(fKafkaAdminClient);
                PowerMockito.when(configDb.parse("/topics")).thenReturn(fBaseTopicData);
-
        }
 
        @Test
        public void testBrokercreate() {
                DMaaPKafkaMetaBroker broker = new DMaaPKafkaMetaBroker();
-
        }
 
        @Test
@@ -90,7 +85,6 @@ public class DMaaPKafkaMetaBrokerTest {
                } catch (Exception e) {
                        assertTrue(true);
                }
-
        }
 
        @Test
@@ -101,7 +95,6 @@ public class DMaaPKafkaMetaBrokerTest {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
-
        }
 
        @Test
index 67a9369..4b00b94 100644 (file)
@@ -26,8 +26,8 @@
 # because they are used in Jenkins, whose plug-in doesn't support
 
 major=1
-minor=3
-patch=2
+minor=4
+patch=0
 
 base_version=${major}.${minor}.${patch}