Merge "Removing commented line"
authorRam Koya <rk541m@att.com>
Tue, 11 Sep 2018 16:12:25 +0000 (16:12 +0000)
committerGerrit Code Review <gerrit@onap.org>
Tue, 11 Sep 2018 16:12:25 +0000 (16:12 +0000)
43 files changed:
pom.xml
src/main/java/com/att/dmf/mr/CambriaApiException.java
src/main/java/com/att/dmf/mr/backends/Consumer.java
src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java
src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java
src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java
src/main/java/com/att/dmf/mr/metabroker/Topic.java
src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java
src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java
src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java
src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java
src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java
src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java
src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java
src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java
src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java
src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
src/main/java/com/att/dmf/mr/utils/PropertyReader.java
src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java
src/main/java/com/att/mr/filter/ContentLengthFilter.java
src/test/java/com/att/nsa/cambria/backends/kafka/CuratorFrameworkImpl.java
version.properties

diff --git a/pom.xml b/pom.xml
index d792506..a2f7e7a 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,7 @@
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.onap.dmaap.messagerouter.msgrtr</groupId>
        <artifactId>msgrtr</artifactId>
-       <version>1.1.5-SNAPSHOT</version>
+       <version>1.1.7-SNAPSHOT</version>
        <packaging>jar</packaging>
        <name>dmaap-messagerouter-msgrtr</name>
        <description>Message Router - Restful interface built for kafka</description>
                <dependency>
                        <groupId>org.codehaus.groovy</groupId>
                        <artifactId>groovy-all</artifactId>
-                       <version>2.4.4</version>
+                       <version>2.4.8</version>
                        <scope>compile</scope>
                        <exclusions>
                                <exclusion>
                        <artifactId>cxf-rt-transports-http-jetty</artifactId> <version>${cxf.version}</version> 
                        </dependency> -->
                <!-- Begin - Spring Dependencies for DI -->
-               <dependency>
-                       <groupId>org.springframework</groupId>
-                       <artifactId>spring-core</artifactId>
-                       <version>${spring.version}</version>
-               </dependency>
                <dependency>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-context</artifactId>
                <dependency>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-expression</artifactId>
-                       <version>4.3.16.RELEASE</version>
+                       <version>4.3.18.RELEASE</version>
                </dependency>
                <dependency>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-webmvc</artifactId>
-                       <version>4.3.15.RELEASE</version>
+                       <version>4.3.18.RELEASE</version>
                </dependency>
                <dependency>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-core</artifactId>
-                       <version>4.3.15.RELEASE</version>
+                       <version>4.3.18.RELEASE</version>
                </dependency>
                <dependency>
                        <groupId>javax.servlet</groupId>
                <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka_2.11</artifactId>
-               <version>0.11.0.0</version>
+               <version>0.11.0.3</version>
                </dependency>
+       <dependency>
+               <groupId>com.google.guava</groupId>
+               <artifactId>guava</artifactId>
+               <version>23.6.1-jre</version>
+       </dependency>
 
                <dependency>
                        <groupId>com.att.eelf</groupId>
                <dependency>
                        <groupId>org.apache.curator</groupId>
                        <artifactId>curator-recipes</artifactId>
-                       <version>2.6.0</version>
+                       <version>4.0.1</version>
                </dependency>
 
                <dependency>
                        <groupId>org.apache.curator</groupId>
                        <artifactId>curator-test</artifactId>
-                       <version>2.6.0</version>
+                       <version>4.0.1</version>
                </dependency>
 
 
index 84dd32c..cdf95ab 100644 (file)
@@ -28,8 +28,12 @@ import com.att.nsa.apiServer.NsaAppException;
 
 public class CambriaApiException extends NsaAppException
 {
+       /*
+        * defined long type constant serialVersionUID
+        */
+       private static final long serialVersionUID = 1L;
        
-       private ErrorResponse errRes;
+       private transient ErrorResponse errRes;
        /**
         * Implements constructor CambriaApiException
         * @param jsonObject
@@ -66,10 +70,6 @@ public class CambriaApiException extends NsaAppException
                this.errRes = errRes;
        }
        
-       /*
-        * defined long type constant serialVersionUID
-        */
-       private static final long serialVersionUID = 1L;
        public ErrorResponse getErrRes() {
                return errRes;
        }
index 2743fc3..f4a9a80 100644 (file)
@@ -21,7 +21,6 @@
  *******************************************************************************/
 package com.att.dmf.mr.backends;
 
-import java.util.ArrayList;
 
 /**
  * A consumer interface. Consumers pull the next message from a given topic.
index 126711a..83c08ec 100644 (file)
@@ -61,8 +61,8 @@ import com.att.dmf.mr.backends.MetricsSet;
 import com.att.dmf.mr.constants.CambriaConstants;
 import com.att.dmf.mr.exception.DMaaPErrorMessages;
 import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.metrics.CdmTimer;
@@ -101,7 +101,7 @@ public class KafkaConsumerCache {
        // the server at least every 30 seconds, timing out after 2 minutes should
        // be okay.
        // FIXME: consider allowing the client to specify its expected call rate?
-       private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
+       private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
 
        // check for expirations pretty regularly
        private static final long kDefault_SweepEverySeconds = 15;
@@ -110,16 +110,13 @@ public class KafkaConsumerCache {
                NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
        }
 
-       // @Qualifier("kafkalockavoid")
-
-       // @Resource
-       // @Qualifier("kafkalockavoid")
-       // KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+       
+       
 
        @Autowired
        private DMaaPErrorMessages errorMessages;
 
-       // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider();
+       
        /**
         * User defined exception class for kafka consumer cache
         * 
@@ -267,8 +264,8 @@ public class KafkaConsumerCache {
                        EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
                        ensurePath.ensure(curator.getZookeeperClient());
 
-                       // final long freq = fSettings.getLong(kSetting_SweepEverySeconds,
-                       // kDefault_SweepEverySeconds);
+                       
+                       
                        long freq = kDefault_SweepEverySeconds;
                        String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
                                        kSetting_SweepEverySeconds);
@@ -301,8 +298,8 @@ public class KafkaConsumerCache {
                        try {
                                curator.blockUntilConnected();
                        } catch (InterruptedException e) {
-                               // Ignore
-                               log.error("error while setting curator framework :" + e.getMessage());
+                               log.error("error while setting curator framework :",e);
+                               Thread.currentThread().interrupt();
                        }
                }
 
@@ -393,8 +390,8 @@ public class KafkaConsumerCache {
                if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
                        throw new KafkaConsumerCacheException("The cache service is unavailable.");
                ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
-               // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
-               // clientId);
+               
+               
                Enumeration<String> strEnum = fConsumers.keys();
                String consumerLocalKey = null;
                while (strEnum.hasMoreElements()) {
@@ -402,9 +399,9 @@ public class KafkaConsumerCache {
 
                        if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
 
-                               // System.out.println("consumer key returning from
-                               // getConsumerListforCG +++++++++ " + consumerLocalKey
-                               // + " " + fConsumers.get(consumerLocalKey));
+                               
+                               
+                               
                                kcl.add(fConsumers.get(consumerLocalKey));
 
                        }
@@ -417,8 +414,7 @@ public class KafkaConsumerCache {
                if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
                        throw new KafkaConsumerCacheException("The cache service is unavailable.");
                ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
-               // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
-               // clientId);
+               
                Enumeration<String> strEnum = fConsumers.keys();
                String consumerLocalKey = null;
                while (strEnum.hasMoreElements()) {
@@ -426,9 +422,7 @@ public class KafkaConsumerCache {
 
                        if (consumerLocalKey.startsWith(group)) {
 
-                               // System.out.println("consumer key returning from
-                               // getConsumerListforCG +++++++++ " + consumerLocalKey
-                               // + " " + fConsumers.get(consumerLocalKey));
+                               
                                kcl.add(fConsumers.get(consumerLocalKey));
 
                        }
@@ -454,7 +448,7 @@ public class KafkaConsumerCache {
                final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
                fConsumers.put(consumerKey, consumer);
 
-               // String appId = "node-instance-"+i;
+               
 
                log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
        }
@@ -517,7 +511,8 @@ public class KafkaConsumerCache {
                                consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
                        Thread.sleep(consumerHandoverWaitMs);
                } catch (InterruptedException e) {
-                       // Ignore
+                       log.error("InterruptedException in dropTimedOutConsumer",e);
+                       Thread.currentThread().interrupt();
                }
                log.info("Dropped " + key + " consumer due to timeout");
        }
@@ -549,7 +544,7 @@ public class KafkaConsumerCache {
                final Kafka011Consumer kc = fConsumers.get(key);
                log.info("closing Kafka consumer " + key + " object " + kc);
                if (kc != null) {
-                       // log.info("closing Kafka consumer " + key);
+                       
                        if (kc.close()) {
                                fConsumers.remove(key);
 
@@ -644,9 +639,8 @@ public class KafkaConsumerCache {
                        throws KafkaConsumerCacheException {
                // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
                final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
-               final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership");
 
-               try {
+               try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
                        final String consumerPath = fBaseZkPath + "/" + consumerKey;
                        log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
                        final CuratorFramework curator = ConfigurationReader.getCurator();
@@ -673,7 +667,8 @@ public class KafkaConsumerCache {
                                consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
                        Thread.sleep(consumerHandoverWaitMs);
                } catch (InterruptedException e) {
-                       // Ignore
+                       log.error("InterruptedException in signalOwnership",e);
+                       Thread.currentThread().interrupt();
                }
        }
 
@@ -690,8 +685,7 @@ public class KafkaConsumerCache {
                        mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
                }
 
-               // final long mustTouchEveryMs =
-               // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
+               
                final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
 
                for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
@@ -744,6 +738,5 @@ public class KafkaConsumerCache {
        }
 
        private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
-       // private static final Logger log =
-       // LoggerFactory.getLogger(KafkaConsumerCache.class);
+       
 }
\ No newline at end of file
index 805701a..f521b41 100644 (file)
@@ -35,9 +35,7 @@ import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.context.annotation.ComponentScan;
 import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
 
 //@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka")
 @Component
@@ -57,7 +55,7 @@ public class KafkaLiveLockAvoider2 {
        
    @PostConstruct
        public void init() {
-       System.out.println("Welcome......................................................................................");
+        log.info("Welcome......................................................................................");
        try {
                if (curatorFramework.checkExists().forPath(locksPath) == null) {
                        curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath);
@@ -67,7 +65,7 @@ public class KafkaLiveLockAvoider2 {
                }
                
        } catch (Exception e) {
-               //e.printStackTrace();
+               
                log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e);
                
        }
@@ -138,7 +136,7 @@ public class KafkaLiveLockAvoider2 {
        protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) {
                
                String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId;
-               //Watcher processNodeWatcher = createWatcher();
+               
                
                try {
                        
index 30209f0..735e372 100644 (file)
@@ -41,11 +41,7 @@ import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
 
-//import kafka.FailedToSendMessageException;
-//import kafka.javaapi.producer.Producer;
-//import kafka.producer.KeyedMessage;
-//import kafka.producer.ProducerConfig;
-//import kafka.producer.KeyedMessage;
+
 
 /**
  * Sends raw JSON objects into Kafka.
@@ -76,26 +72,23 @@ public class KafkaPublisher implements Publisher {
                        
                        kafkaConnUrl="localhost:9092";
                }
-               //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf";
-        //     props.put("bootstrap.servers", bootSever);
-       //System.setProperty("java.security.auth.login.config",jaaspath);
+               
        
-               /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
-               transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
-               transferSetting( props, "sasl.mechanism", "PLAIN");*/
+       
+               
                transferSetting( props, "bootstrap.servers",kafkaConnUrl);
-                       //transferSetting( props, "metadata.broker.list", kafkaConnUrl);
+                       
                transferSetting( props, "request.required.acks", "1");
                transferSetting( props, "message.send.max.retries", "5");
                transferSetting(props, "retry.backoff.ms", "150"); 
 
-               //props.put("serializer.class", "kafka.serializer.StringEncoder");
+               
                
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
-               //fConfig = new ProducerConfig(props);
-               //fProducer = new Producer<String, String>(fConfig);
+               
+               
                fProducer = new KafkaProducer<>(props);
        }
 
@@ -180,11 +173,11 @@ public class KafkaPublisher implements Publisher {
                        throws IOException {
                log.info("sending " + msgs.size() + " events to [" + topic + "]");
 try{
-               final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size());
+               final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size());
                        for (message o : msgs) {
                        
-                       final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString());
-                       //kms.add(data);
+                       final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString());
+                       
                
                try {
 
@@ -200,7 +193,7 @@ try{
 }
        //private final rrNvReadable fSettings;
 
-       //private ProducerConfig fConfig;
+       
        private Producer<String, String> fProducer;
 
   /**
@@ -227,9 +220,5 @@ try{
                
        }
 
-       //@Override
-       //public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
-               // TODO Auto-generated method stub
-               
-       //}
+       
 }
\ No newline at end of file
index 0c34bfd..237cac8 100644 (file)
@@ -35,6 +35,9 @@ import com.att.dmf.mr.backends.ConsumerFactory;
  */
 public class MemoryConsumerFactory implements ConsumerFactory
 {
+
+       private final MemoryQueue fQueue;
+       
        /**
         * 
         * Initializing constructor
@@ -59,8 +62,6 @@ public class MemoryConsumerFactory implements ConsumerFactory
                return new MemoryConsumer ( topic, consumerGroupId );
        }
 
-       private final MemoryQueue fQueue;
-
        /**
         * 
         * Define nested inner class
@@ -68,6 +69,12 @@ public class MemoryConsumerFactory implements ConsumerFactory
         */
        private class MemoryConsumer implements Consumer
        {
+
+               private final String fTopic;
+               private final String fConsumer;
+               private final long fCreateMs;
+               private long fLastAccessMs;
+               
                /**
                 * 
                 * Initializing MemoryConsumer constructor 
@@ -93,11 +100,6 @@ public class MemoryConsumerFactory implements ConsumerFactory
                        return fQueue.get ( fTopic, fConsumer );
                }
 
-               private final String fTopic;
-               private final String fConsumer;
-               private final long fCreateMs;
-               private long fLastAccessMs;
-
                @Override
                public boolean close() {
                        //Nothing to close/clean up.
@@ -168,7 +170,7 @@ public class MemoryConsumerFactory implements ConsumerFactory
         */
        public Collection<? extends Consumer> getConsumers ()
        {
-               return new ArrayList<MemoryConsumer> ();
+               return new ArrayList<> ();
        }
 
        @Override
index 22f0588..e0c80bd 100644 (file)
@@ -39,6 +39,10 @@ import com.att.nsa.security.NsaApiKey;
  *
  */
 public class MemoryMetaBroker implements Broker {
+
+       private final MemoryQueue fQueue;
+       private final HashMap<String, MemTopic> fTopics;
+       
        /**
         * 
         * @param mq
@@ -46,9 +50,9 @@ public class MemoryMetaBroker implements Broker {
         * @param settings
         */
        public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) {
-       //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) {
+       
                fQueue = mq;
-               fTopics = new HashMap<String, MemTopic>();
+               fTopics = new HashMap<>();
        }
 
        @Override
@@ -78,10 +82,16 @@ public class MemoryMetaBroker implements Broker {
                fQueue.removeTopic(topic);
        }
 
-       private final MemoryQueue fQueue;
-       private final HashMap<String, MemTopic> fTopics;
-
        private static class MemTopic implements Topic {
+
+               private final String fName;
+               private final String fDesc;
+               private final String fOwner;
+               private NsaAcl fReaders;
+               private NsaAcl fWriters;
+               private boolean ftransactionEnabled;
+               private String accessDenied = "User does not own this topic ";
+               
                /**
                 * constructor initialization
                 * 
@@ -141,7 +151,7 @@ public class MemoryMetaBroker implements Broker {
                @Override
                public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
                        if (!fOwner.equals(asUser.getKey())) {
-                               throw new AccessDeniedException("User does not own this topic " + fName);
+                               throw new AccessDeniedException(accessDenied + fName);
                        }
                        if (fWriters == null) {
                                fWriters = new NsaAcl();
@@ -152,7 +162,7 @@ public class MemoryMetaBroker implements Broker {
                @Override
                public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
                        if (!fOwner.equals(asUser.getKey())) {
-                               throw new AccessDeniedException("User does not own this topic " + fName);
+                               throw new AccessDeniedException(accessDenied + fName);
                        }
                        fWriters.remove(publisherId);
                }
@@ -160,7 +170,7 @@ public class MemoryMetaBroker implements Broker {
                @Override
                public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
                        if (!fOwner.equals(asUser.getKey())) {
-                               throw new AccessDeniedException("User does not own this topic " + fName);
+                               throw new AccessDeniedException(accessDenied + fName);
                        }
                        if (fReaders == null) {
                                fReaders = new NsaAcl();
@@ -171,18 +181,11 @@ public class MemoryMetaBroker implements Broker {
                @Override
                public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
                        if (!fOwner.equals(asUser.getKey())) {
-                               throw new AccessDeniedException("User does not own this topic " + fName);
+                               throw new AccessDeniedException(accessDenied + fName);
                        }
                        fReaders.remove(consumerId);
                }
 
-               private final String fName;
-               private final String fDesc;
-               private final String fOwner;
-               private NsaAcl fReaders;
-               private NsaAcl fWriters;
-               private boolean ftransactionEnabled;
-
                @Override
                public boolean isTransactionEnabled() {
                        return ftransactionEnabled;
@@ -190,7 +193,7 @@ public class MemoryMetaBroker implements Broker {
 
                @Override
                public Set<String> getOwners() {
-                       final TreeSet<String> set = new TreeSet<String> ();
+                       final TreeSet<String> set = new TreeSet<> ();
                        set.add ( fOwner );
                        return set;
                }
index 0629972..8ab4619 100644 (file)
@@ -43,7 +43,7 @@ public class MemoryQueue {
         * constructor storing hashMap objects in Queue and Offsets object
         */
        public MemoryQueue() {
-               fQueue = new HashMap<String, LogBuffer>();
+               fQueue = new HashMap<>();
                fOffsets = new HashMap<String, HashMap<String, Integer>>();
        }
 
@@ -102,7 +102,7 @@ public class MemoryQueue {
 
                HashMap<String, Integer> offsetMap = fOffsets.get(consumerName);
                if (offsetMap == null) {
-                       offsetMap = new HashMap<String, Integer>();
+                       offsetMap = new HashMap<>();
                        fOffsets.put(consumerName, offsetMap);
                }
                Integer offset = offsetMap.get(topic);
@@ -169,7 +169,7 @@ public class MemoryQueue {
                public LogBuffer(int maxSize) {
                        fBaseOffset = 0;
                        fMaxSize = maxSize;
-                       fList = new ArrayList<String>();
+                       fList = new ArrayList<>();
                }
 
                /**
index 5f28367..8cbf64f 100644 (file)
@@ -35,9 +35,6 @@ import com.att.dmf.mr.exception.DMaaPResponseCode;
 import com.att.dmf.mr.exception.ErrorResponse;
 import com.att.dmf.mr.utils.Utils;
 
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
@@ -54,6 +51,15 @@ import com.att.nsa.metrics.impl.CdmRateTicker;
  */
 @Component
 public class DMaaPCambriaLimiter {
+       private final HashMap<String, RateInfo> fRateInfo;
+       private final HashMap<String, RateInfoCheck> fRateInfoCheck;
+       private final double fMaxEmptyPollsPerMinute;
+       private final double fMaxPollsPerMinute;
+       private final int fWindowLengthMins;
+       private final long fSleepMs;
+       private final long fSleepMs1;
+       private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+       
        /**
         * constructor initializes
         * 
@@ -62,10 +68,9 @@ public class DMaaPCambriaLimiter {
         * @throws invalidSettingValue
         */
        @Autowired
-       public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
-                       throws missingReqdSetting, invalidSettingValue {
-                       fRateInfo = new HashMap<String, RateInfo>();
-               fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+       public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
+                       fRateInfo = new HashMap<>();
+               fRateInfoCheck = new HashMap<>();
                fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
                                CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
                fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
@@ -78,19 +83,7 @@ public class DMaaPCambriaLimiter {
                                5000);
                
        }
-
-       /**
-        * static method provide the sleep time
-        * 
-        * @param ratePerMinute
-        * @return
-        */
-       public static long getSleepMsForRate(double ratePerMinute) {
-               if (ratePerMinute <= 0.0)
-                       return 0;
-               return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
-       }
-
+       
        /**
         * Construct a rate limiter.
         * 
@@ -111,8 +104,8 @@ public class DMaaPCambriaLimiter {
         * @param windowLengthMins
         */
        public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
-               fRateInfo = new HashMap<String, RateInfo>();
-               fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+               fRateInfo = new HashMap<>();
+               fRateInfoCheck = new HashMap<>();
                fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
                fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
                fWindowLengthMins = windowLengthMins;
@@ -120,6 +113,18 @@ public class DMaaPCambriaLimiter {
                fSleepMs1 = Math.max(0, sleepMS1);
        }
 
+       /**
+        * static method provide the sleep time
+        * 
+        * @param ratePerMinute
+        * @return
+        */
+       public static long getSleepMsForRate(double ratePerMinute) {
+               if (ratePerMinute <= 0.0)
+                       return 0;
+               return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
+       }
+
        /**
         * Tell the rate limiter about a call to a topic/group/id. If the rate is
         * too high, this call delays its return and throws an exception.
@@ -151,6 +156,7 @@ public class DMaaPCambriaLimiter {
                                        log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
                                }
                        } catch (InterruptedException e) {
+                               log.error("Exception "+ e);
                                // ignore
                        }
                        
@@ -163,37 +169,7 @@ public class DMaaPCambriaLimiter {
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
                }
-               /*if (fMaxPollsPerMinute <= 0) {
-                       return;
-               }
                
-               final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
-               final double ratevalue = ric.onCall();
-               if (ratevalue > fMaxPollsPerMinute) {
-                       try {
-                               log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
-                                               + ".");
-                               if (fSleepMs1 > fMaxPollsPerMinute) {
-                               log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
-                                                       + " ms sleep, then responding in error.");
-                                       Thread.sleep(fSleepMs1);
-                                       ric.reset();
-                               } else {
-                                       log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
-                               }
-                       } catch (InterruptedException e) {
-                               // ignore
-                       }
-                       
-                       
-                       ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
-                                       DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
-                                       "This client is making too many requests "
-                                                       + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
-                       
-                       log.info(errRes.toString());
-                       throw new CambriaApiException(errRes);
-               }*/
                
        }
 
@@ -213,6 +189,8 @@ public class DMaaPCambriaLimiter {
        }
 
        private static class RateInfo {
+               private final String fLabel;
+               private final CdmRateTicker fCallRateSinceLastMsgSend;
                /**
                 * constructor initialzes
                 * 
@@ -244,14 +222,14 @@ public class DMaaPCambriaLimiter {
                        fCallRateSinceLastMsgSend.tick();
                        return fCallRateSinceLastMsgSend.getRate();
                }
-
-               private final String fLabel;
-               private final CdmRateTicker fCallRateSinceLastMsgSend;
        }
        
        
        
        private static class RateInfoCheck {
+               
+               private final String fLabel;
+               private final CdmRateTicker fCallRateSinceLastMsgSend;
                /**
                 * constructor initialzes
                 * 
@@ -283,21 +261,10 @@ public class DMaaPCambriaLimiter {
                        fCallRateSinceLastMsgSend.tick();
                        return fCallRateSinceLastMsgSend.getRate();
                }
-
-               private final String fLabel;
-               private final CdmRateTicker fCallRateSinceLastMsgSend;
        }
        
        
-       private final HashMap<String, RateInfo> fRateInfo;
-       private final HashMap<String, RateInfoCheck> fRateInfoCheck;
-       private final double fMaxEmptyPollsPerMinute;
-       private final double fMaxPollsPerMinute;
-       private final int fWindowLengthMins;
-       private final long fSleepMs;
-       private final long fSleepMs1;
-       //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
-       private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+       
        
        private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
                final String key = makeKey(topic, consumerGroup, clientId);
@@ -310,15 +277,7 @@ public class DMaaPCambriaLimiter {
        }
        
        
-       private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
-               final String key = makeKey(topic, consumerGroup, clientId);
-               RateInfoCheck ri = fRateInfoCheck.get(key);
-               if (ri == null) {
-                       ri = new RateInfoCheck(key, 1);
-                       fRateInfoCheck.put(key, ri);
-               }
-               return ri;
-       }
+       
 
        
        
index 6fc0838..f60fd53 100644 (file)
@@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
 import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
 import com.att.dmf.mr.constants.CambriaConstants;
 import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
@@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
  */
 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
-       // private static final Logger log = LoggerFactory
-       // .getLogger(DMaaPKafkaConsumerFactory.class);
+       
        private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
-       // @Autowired
-       // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new
-       // KafkaLiveLockAvoider();
+       
 
        /**
         * constructor initialization
@@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
                final boolean isCacheEnabled = kSetting_EnableCache;
 
-               // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
-               // metrics) : null;
+               
                fCache = null;
                if (isCacheEnabled) {
                        fCache = KafkaConsumerCache.getInstance();
@@ -189,14 +185,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
                                        log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
                                                        + "], on topic [" + topic + "].");
-
-                                       fCache.signalOwnership(topic, consumerGroupName, consumerId);
-
+                                       
+                                       if (fCache != null) {
+                                               fCache.signalOwnership(topic, consumerGroupName, consumerId);
+                                       }
+                                       
                                        final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
                                        long fCreateTimeMs = System.currentTimeMillis();
                                        KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
-                                       kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj()
-                                                                                                                                                                                                                               // );
+                                       kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
                                        log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
 
                                        if (fCache != null) {
@@ -265,10 +262,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
        private void transferSettingIfProvided(Properties target, String key, String prefix) {
                String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
 
-               // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
+               
                if (null != keyVal) {
-                       // final String val = fSettings
-                       // .getString(makeLongKey(key, prefix), "");
+               
                        log.info("Setting [" + key + "] to " + keyVal + ".");
                        target.put(key, keyVal);
                }
@@ -294,10 +290,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
                props.put("group.id", fakeGroupName);
                props.put("enable.auto.commit", "false"); // 0.11
                props.put("bootstrap.servers", fkafkaBrokers);
-               /*props.put("sasl.jaas.config",
-                               "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
-               props.put("security.protocol", "SASL_PLAINTEXT");
-               props.put("sasl.mechanism", "PLAIN");*/
+
+               
                props.put("client.id", consumerId);
 
                // additional settings: start with our defaults, then pull in configured
index 643eae9..4bef985 100644 (file)
@@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader;
 //import org.apache.log4-j.Logger;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
+
 import com.att.nsa.configs.ConfigDb;
 import com.att.nsa.configs.ConfigDbException;
 import com.att.nsa.configs.ConfigPath;
@@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                
                
             props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-           /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
-                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
-            props.put("sasl.mechanism", "PLAIN");*/
+          
             fKafkaAdminClient=AdminClient.create ( props );
-           // fKafkaAdminClient = null;
+           
        }
 
        //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
@@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                
                
             props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-            /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
-                props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");            
-            props.put("sasl.mechanism", "PLAIN");*/
+            
             fKafkaAdminClient=AdminClient.create ( props );
-           // fKafkaAdminClient = null;
+           
                
                
        }
        
        public DMaaPKafkaMetaBroker( rrNvReadable settings,
                        ZkClient zk,  ConfigDb configDb,AdminClient client) {
-               //fSettings = settings;
+               
                fZk = zk;
                fCambriaConfig = configDb;
                fBaseTopicData = configDb.parse("/topics");
            fKafkaAdminClient= client;
-           // fKafkaAdminClient = null;
+          
                
                
        }
@@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        }
                        catch ( InterruptedException e )
                        {
-                               //timer.fail ( "Timeout" );
+                               
                                log.warn ( "Execution of describeTopics timed out." );
                                throw new ConfigDbException ( e );
                        }
                        catch ( ExecutionException e )
                        {
-                               //timer.fail ( "ExecutionError" );
+                               
                                log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
                                throw new ConfigDbException ( e.getCause () );
                        }
@@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
                        log.info("Loading zookeeper client for topic deletion.");
                                        // topic creation. (Otherwise, the topic is only partially created
                        // in ZK.)
-                       /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
-                                       ZKStringSerializer$.MODULE$);
-                       String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
-                       if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
-                       ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
-                       */
+                       
                        
                        fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
                        log.info("Zookeeper client loaded successfully. Deleting topic.");
-                       //AdminUtils.deleteTopic(zkutils, topic);
+                       
                } catch (Exception e) {
                        log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
                        throw new ConfigDbException(e);
index 9942837..4c9532b 100644 (file)
@@ -84,9 +84,9 @@ public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSe
         * 
         * @param cs
         */
-       //public DMaaPMetricsSet() {
+       
                public DMaaPMetricsSet(rrNvReadable cs) {
-               //fSettings = cs;
+               
                fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
                super.putItem("version", fVersion);
 
index e29403f..963ff2d 100644 (file)
@@ -23,7 +23,7 @@ package com.att.dmf.mr.beans;
 
 import java.security.Key;
 
-//import org.apache.log4-j.Logger;
+
 import org.springframework.beans.factory.annotation.Autowired;
 
 import com.att.dmf.mr.constants.CambriaConstants;
@@ -48,11 +48,11 @@ import com.att.nsa.util.rrConvertor;
  */
 public class DMaaPNsaApiDb {
        
-       //private rrNvReadable settings;
+       
        private DMaaPZkConfigDb cdb;
        
        //private static final Logger log = Logger
-               //      .getLogger(DMaaPNsaApiDb.class.toString());
+               
        private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class);
        
 /**
@@ -63,7 +63,7 @@ public class DMaaPNsaApiDb {
  */
        @Autowired
        public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) {
-               //this.setSettings(settings);
+               
                this.setCdb(cdb);
        }
        /**
@@ -79,16 +79,16 @@ public class DMaaPNsaApiDb {
                        missingReqdSetting {
                // Cambria uses an encrypted api key db
 
-               //final String keyBase64 = settings.getString("cambria.secureConfig.key",                       null);
+               
                final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key");
                
                
-       //      final String initVectorBase64 = settings.getString(                             "cambria.secureConfig.iv", null);
+       
        final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv");
                // if neither value was provided, don't encrypt api key db
                if (keyBase64 == null && initVectorBase64 == null) {
                        log.info("This server is configured to use an unencrypted API key database. See the settings documentation.");
-                       return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb,
+                       return new BaseNsaApiDbImpl<>(cdb,
                                        new NsaSimpleApiKeyFactory());
                } else if (keyBase64 == null) {
                        // neither or both, otherwise something's goofed
@@ -100,7 +100,7 @@ public class DMaaPNsaApiDb {
                        log.info("This server is configured to use an encrypted API key database.");
                        final Key key = EncryptingLayer.readSecretKey(keyBase64);
                        final byte[] iv = rrConvertor.base64Decode(initVectorBase64);
-                       return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb,
+                       return new EncryptingApiDbImpl<>(cdb,
                                        new NsaSimpleApiKeyFactory(), key, iv);
                }
        }
@@ -109,17 +109,17 @@ public class DMaaPNsaApiDb {
         * @return
         * returns settings
         */
-/*     public rrNvReadable getSettings() {
-               return settings;
-       }*/
+
+               
+       
 
        /**
         * @param settings
         * set settings
         */
-       /*public void setSettings(rrNvReadable settings) {
-               this.settings = settings;
-       }*/
+       
+               
+       
 
         /**
         * @return
index d543721..5aa25fa 100644 (file)
@@ -26,7 +26,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
 import com.att.dmf.mr.utils.ConfigurationReader;
 import com.att.nsa.configs.confimpl.ZkConfigDb;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
-//import com.att.nsa.configs.confimpl.ZkConfigDb;
+
 /**
  * Provide the zookeeper config db connection 
  * @author nilanjana.maity
@@ -42,7 +42,7 @@ public class DMaaPZkConfigDb extends ZkConfigDb {
        public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk,
                        @Qualifier("propertyReader") rrNvReadable settings) {
                
-               //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot));
+               
                super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot());
                
        }
index a971c3f..db691bd 100644 (file)
@@ -35,7 +35,7 @@ import javax.ws.rs.ext.ExceptionMapper;
 import javax.ws.rs.ext.Provider;
 
 import org.apache.http.HttpStatus;
-//import org.apache.log-4j.Logger;
+
 import org.springframework.beans.factory.annotation.Autowired;
 
 import com.att.eelf.configuration.EELFLogger;
@@ -51,8 +51,7 @@ import com.att.eelf.configuration.EELFManager;
 @Singleton
 public class DMaaPWebExceptionMapper implements ExceptionMapper<WebApplicationException>{
        
-       //private static final Logger LOGGER = Logger
-               //      .getLogger(DMaaPWebExceptionMapper.class);
+       
        private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(DMaaPWebExceptionMapper.class);
        private ErrorResponse errRes;
        
index 6022b91..64b20e8 100644 (file)
@@ -35,7 +35,7 @@ import com.att.eelf.configuration.EELFManager;
 public class CambriaServletContextListener implements ServletContextListener {
        
        DME2EndPointLoader loader = DME2EndPointLoader.getInstance();
-//     private static Logger log = Logger.getLogger(CambriaServletContextListener.class);
+
        private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaServletContextListener.class);
        
 
index 7f27798..f61b6ea 100644 (file)
@@ -51,7 +51,7 @@ public class DME2EndPointLoader {
        private String protocol;
        private String serviceURL;
        private static DME2EndPointLoader loader = new DME2EndPointLoader();
-//     private static final Logger LOG = LoggerFactory.getLogger(EventsServiceImpl.class);
+
        private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
        private DME2EndPointLoader() {
        }
index 422a2cc..d191070 100644 (file)
@@ -39,16 +39,16 @@ public interface Topic extends ReadWriteSecuredResource
         *
         *//*
        public class AccessDeniedException extends Exception
-       {       
+       
                *//**
                 * AccessDenied Description
                 *//*
-               public AccessDeniedException () { super ( "Access denied." ); } 
+               
                *//**
                 * AccessDenied Exception for the user while authenticating the user request
                 * @param user
                 *//*
-               public AccessDeniedException ( String user ) { super ( "Access denied for " + user ); } 
+               
                private static final long serialVersionUID = 1L;
        }*/
 
index 0993aa6..4b219b1 100644 (file)
  *******************************************************************************/
 package com.att.dmf.mr.metrics.publisher;
 
-//import org.slf4j.Logger;
+
 
 //
 import com.att.eelf.configuration.EELFLogger;
-//import com.att.eelf.configuration.EELFManager;
+
 
 /**
  * 
index 1510c32..46dfa99 100644 (file)
@@ -95,7 +95,7 @@ public class CambriaPublisherUtility
         */
        public static List<HttpHost> createHostsList(Collection<String> hosts)
        {
-               final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> ();
+               final ArrayList<HttpHost> convertedHosts = new ArrayList<>();
                for ( String host : hosts )
                {
                        if ( host.length () == 0 ) continue;
index d02438f..9158c96 100644 (file)
@@ -386,12 +386,7 @@ public class DMaaPCambriaClientFactory {
         *            Your API secret
         * @return an identity manager
         */
-       /*
-        * public static CambriaIdentityManager createIdentityManager (
-        * Collection<String> hostSet, String apiKey, String apiSecret ) { final
-        * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet );
-        * cim.setApiCredentials ( apiKey, apiSecret ); return cim; }
-        */
+       
 
        /**
         * Create a topic manager for working with topics.
@@ -405,12 +400,7 @@ public class DMaaPCambriaClientFactory {
         *            Your API secret
         * @return a topic manager
         */
-       /*
-        * public static CambriaTopicManager createTopicManager ( Collection<String>
-        * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi
-        * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey,
-        * apiSecret ); return tmi; }
-        */
+       
 
        /**
         * Inject a consumer. Used to support unit tests.
index 08b2fd1..ebdf3ed 100644 (file)
@@ -31,7 +31,7 @@ import org.json.JSONArray;
 import org.json.JSONException;
 
 import com.att.dmf.mr.constants.CambriaConstants;
-//import org.slf4j.Logger;
+
 //import org.slf4j.LoggerFactory;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
@@ -52,12 +52,12 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr
 
        public CambriaBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException
        {
-               /*super ( hosts, CambriaConstants.kStdCambriaServicePort, clientSignature,
-                       CacheUse.NONE, 1, 1, TimeUnit.MILLISECONDS );*/
+               
+                       
                
                super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000);
 
-               //fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+               
                fLog = EELFManager.getInstance().getLogger(this.getClass().getName());
                //( this.getClass().getName () );
        }
@@ -85,7 +85,7 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr
        {
                fLog = log; 
                
-               //replaceLogger ( log );
+               
        }
 
        public EELFLogger  getLog ()
index d8d8799..dee9e57 100644 (file)
@@ -186,7 +186,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
        public void close() {
                try {
                        final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-                       if (remains.size() > 0) {
+                       if (remains.isEmpty()) {
                                getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
                                                + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
                        }
@@ -251,7 +251,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
         */
        private synchronized boolean shouldSendNow() {
                boolean shouldSend = false;
-               if (fPending.size() > 0) {
+               if (fPending.isEmpty()) {
                        final long nowMs = Clock.now();
 
                        shouldSend = (fPending.size() >= fMaxBatchSize);
@@ -273,7 +273,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
        private synchronized boolean sendBatch() {
                // it's possible for this call to be made with an empty list. in this
                // case, just return.
-               if (fPending.size() < 1) {
+               if (fPending.isEmpty()) {
                        return true;
                }
 
@@ -305,8 +305,8 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
 
                        // code from REST Client Starts
 
-                       // final String serverCalculatedSignature = sha1HmacSigner.sign
-                       // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV");
+                       
+                       
 
                        Client client = ClientBuilder.newClient();
                        String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
@@ -323,32 +323,19 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
                        Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria");
 
                        Response response = target.request().post(data);
-                       // header("X-CambriaAuth",
-                       // "2OH46YIWa329QpEF:"+serverCalculatedSignature).
-                       // header("X-CambriaDate", "2015-09-21T11:38:19-0700").
-                       // post(Entity.json(baseStream.toByteArray()));
-
+                       
                        getLog().info("Response received :: " + response.getStatus());
                        getLog().info("Response received :: " + response.toString());
 
                        // code from REST Client Ends
 
-                       /*
-                        * final JSONObject result = post ( url, contentType,
-                        * baseStream.toByteArray(), true ); final String logLine =
-                        * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" +
-                        * result.toString (); getLog().info ( logLine );
-                        */
+                       
                        fPending.clear();
                        return true;
                } catch (IllegalArgumentException x) {
                        getLog().warn(x.getMessage(), x);
                }
-               /*
-                * catch ( HttpObjectNotFoundException x ) { getLog().warn (
-                * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn (
-                * x.getMessage(), x ); }
-                */
+               
                catch (IOException x) {
                        getLog().warn(x.getMessage(), x);
                }
index 98ddb50..7a67c92 100644 (file)
@@ -81,10 +81,7 @@ public class CambriaJsonStreamReader implements reader {
 
                        final int c = fTokens.next();
                        
-                       /*if (c ==','){
-                               fCloseCount++;
-                               System.out.println("fCloseCount=" + fCloseCount +" fCount "+fCount);
-                       }*/
+                       
                        if (fIsList) {
                                if (c == ']' || (fCount > 0 && c == 10))
                                        return null;
@@ -125,7 +122,7 @@ public class CambriaJsonStreamReader implements reader {
                 * 
                 * @param o
                 */
-               //public msg(JSONObject o){}
+               
                
                
                public msg(JSONObject o) {
index 376d140..f64c0de 100644 (file)
@@ -137,5 +137,5 @@ public class CambriaRawStreamReader implements reader
        private final InputStream fStream;
        private final String fDefPart;
        private boolean fClosed;
-       //private String transactionId;
+       
 }
index b550373..ed0893d 100644 (file)
@@ -46,7 +46,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
                        
                        auth = true;
                }
-               //System.out.println("role " +role +"    user: "+ req.getRemoteUser() +"   : auth="+auth);
+               
                return auth;
        }
 
@@ -57,7 +57,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
                String permission = "";
                String nameSpace ="";
                if(topicName.contains(".") && topicName.contains("com.att")) {
-                       //String topic = topicName.substring(topicName.lastIndexOf(".")+1);
+                       
                        nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
                }
                else {
@@ -67,12 +67,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
                        if(null==nameSpace)nameSpace="com.att.dmaap.mr.ueb";
                        
                        
-                       /*ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                       DMaaPResponseCode.TOPIC_NOT_IN_AAF.getResponseCode(), "Topic does not exist in AAF"
-                                                       , null, Utils.getFormattedDate(new Date()), topicName,
-                                       null, null, null, null);
-                                       
-                       throw new CambriaApiException(errRes);*/
+                       
                }
                
                permission = nameSpace+".mr.topic|:topic."+topicName+"|"+action;
index e9f28ae..64dbc14 100644 (file)
@@ -25,7 +25,7 @@ import javax.servlet.http.HttpServletRequest;
 
 import com.att.dmf.mr.beans.DMaaPContext;
 import com.att.dmf.mr.security.DMaaPAuthenticator;
-//import com.att.nsa.security.db.NsaApiDb;
+
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.security.NsaApiKey;
@@ -65,7 +65,7 @@ public class DMaaPMechIdAuthenticator <K extends NsaApiKey> implements DMaaPAuth
                log.info ( "AUTH-LOG(" + remoteAddr + "): " + msg );
        }
 
-//     private final NsaApiDb<K> fDb;
+
        //private static final Logger log = Logger.getLogger( MechIdAuthenticator.class.toString());
        private static final EELFLogger log = EELFManager.getInstance().getLogger(MechIdAuthenticator.class);
 /**
index a26c9e7..b1e28e7 100644 (file)
@@ -54,9 +54,9 @@ public class DMaaPOriginalUebAuthenticator<K extends NsaApiKey> implements DMaaP
        public DMaaPOriginalUebAuthenticator(NsaApiDb<K> db, long requestTimeWindowMs) {
                fDb = db;
                fRequestTimeWindowMs = requestTimeWindowMs;
-               //fAuthenticators = new LinkedList<DMaaPAuthenticator<K>>();
+               
 
-               //fAuthenticators.add(new DMaaPOriginalUebAuthenticator<K>(db, requestTimeWindowMs));
+               
 
        }
 
@@ -243,51 +243,51 @@ public class DMaaPOriginalUebAuthenticator<K extends NsaApiKey> implements DMaaP
                    "EEEE, dd-MMM-yy HH:mm:ss zzz",
                };
 
-       /*private static final String kDateFormats[] = {
-                       // W3C date format (RFC 3339).
-                       "yyyy-MM-dd'T'HH:mm:ssz",
+       
+                       
+                       
 
-                       // Preferred HTTP date format (RFC 1123).
-                       "EEE, dd MMM yyyy HH:mm:ss zzz",
+               
+                       
 
-                       // simple unix command line 'date' format
-                       "EEE MMM dd HH:mm:ss z yyyy",
+                       
+                       
 
-                       // Common date format (RFC 822).
-                       "EEE, dd MMM yy HH:mm:ss z", "EEE, dd MMM yy HH:mm z", "dd MMM yy HH:mm:ss z", "dd MMM yy HH:mm z",
+                       
+                       
 
-                       // Obsoleted HTTP date format (ANSI C asctime() format).
-                       "EEE MMM dd HH:mm:ss yyyy",
+                       
+                       
 
-                       // Obsoleted HTTP date format (RFC 1036).
-                       "EEEE, dd-MMM-yy HH:mm:ss zzz", }; */
+                       
+                       
        // logger declaration
-       //private static final Logger log = Logger.getLogger(DMaaPOriginalUebAuthenticator.class.toString());
+       
        private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPOriginalUebAuthenticator.class);
        @Override
-//     public K authenticate(DMaaPContext ctx) {
+
                // TODO Auto-generated method stub
-               //return null;
+               
        //}
        
        public K authenticate(DMaaPContext ctx) {
                
-               /*final HttpServletRequest req = ctx.getRequest();
-               for (DMaaPAuthenticator<K> a : fAuthenticators) {
-                       if (a.qualify(req)) {
-                               final K k = a.isAuthentic(req);
-                               if (k != null)
-                                       return k;
-                       }
-                       // else: this request doesn't look right to the authenticator
-               }*/
+               
+               
+                       
+                               
+                               
+                                       
+                       
+                       
+               
                return null;
        }
 
 
        public void addAuthenticator ( DMaaPAuthenticator<K> a )
        {
-               //this.fAuthenticators.add(a);
+               
        }
-       //private final LinkedList<DMaaPAuthenticator<K>> fAuthenticators;
+       
 }
\ No newline at end of file
index 110970f..f7c48de 100644 (file)
@@ -42,7 +42,7 @@ import com.att.nsa.configs.ConfigDbException;
 import com.att.nsa.limits.Blacklist;
 import com.att.nsa.security.NsaApiKey;
 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-//import com.att.sa.highlandPark.util.HpJsonUtil;
+
 
 /**
  * @author muzainulhaque.qazi
index c818f88..b0e8a86 100644 (file)
@@ -54,7 +54,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey;
 @Service
 public class ApiKeysServiceImpl implements ApiKeysService {
 
-       //private Logger log = Logger.getLogger(ApiKeysServiceImpl.class.toString());
+       
        private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysServiceImpl.class.toString());
        /**
         * This method will provide all the ApiKeys present in kafka server.
@@ -139,7 +139,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
                         String kSetting_AllowAnonymousKeys= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"apiKeys.allowAnonymous");
                         if(null==kSetting_AllowAnonymousKeys) kSetting_AllowAnonymousKeys ="false";
                         
-            // if ((contactEmail == null) || (contactEmail.length() == 0))
+           
                         if ( kSetting_AllowAnonymousKeys.equalsIgnoreCase("true")    &&  !emailProvided   )
              {
                DMaaPResponseBuilder.respondWithErrorInJson(dmaapContext, 400, "You must provide an email address.");
@@ -165,7 +165,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
                        log.debug("=======ApiKeysServiceImpl: createApiKey : saving api key : "
                                        + key.toString() + "=====");
                        apiKeyDb.saveApiKey(key);
-                       // System.out.println("here4");
+                       
                        // email out the secret to validate the email address
                        if ( emailProvided )
                        {
@@ -196,9 +196,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
                                );
                        DMaaPResponseBuilder.respondOk(dmaapContext,
                                        o);
-                        /*o.put("secret", "Emailed to " + contactEmail + ".");
-                       DMaaPResponseBuilder.respondOk(dmaapContext,
-                                       o); */
+                       
                        return;
                } else {
                        log.debug("=======ApiKeysServiceImpl: createApiKey : Error in creating API Key.=====");
index 4ca6446..22b60fe 100644 (file)
@@ -94,7 +94,7 @@ import com.att.nsa.util.rrConvertor;
 @Service
 public class EventsServiceImpl implements EventsService {
        // private static final Logger LOG =
-       // Logger.getLogger(EventsServiceImpl.class);
+       
        private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
 
        private static final String BATCH_LENGTH = "event.batch.length";
@@ -103,10 +103,10 @@ public class EventsServiceImpl implements EventsService {
        private DMaaPErrorMessages errorMessages;
        
        //@Autowired
-       //KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+       
 
        // @Value("${metrics.send.cambria.topic}")
-       // private String metricsTopic;
+       
 
        public DMaaPErrorMessages getErrorMessages() {
                return errorMessages;
@@ -133,7 +133,7 @@ public class EventsServiceImpl implements EventsService {
                        CambriaApiException, IOException, DMaaPAccessDeniedException {
                final long startTime = System.currentTimeMillis();
                final HttpServletRequest req = ctx.getRequest();
-       //System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"+kafkaLiveLockAvoider);
+       
                boolean isAAFTopic = false;
                // was this host blacklisted?
                final String remoteAddr = Utils.getRemoteAddress(ctx);
@@ -158,7 +158,7 @@ public class EventsServiceImpl implements EventsService {
                if (strtimeoutMS != null)
                        timeoutMs = Integer.parseInt(strtimeoutMS);
                // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
-               // CambriaConstants.kNoTimeout);
+               
                if (req.getParameter("timeout") != null) {
                        timeoutMs = Integer.parseInt(req.getParameter("timeout"));
                }
@@ -214,7 +214,7 @@ public class EventsServiceImpl implements EventsService {
                // if headers are not provided then user will be null
                if (user == null && null != ctx.getRequest().getHeader("Authorization")) {
                        // the topic name will be sent by the client
-                       // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub";
+                       
                        DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
                        String permission = aaf.aafPermissionString(topic, "sub");
                        if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
@@ -234,7 +234,7 @@ public class EventsServiceImpl implements EventsService {
                logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
                                + " " + clientId);
                Consumer c = null;
-               // String localclientId = clientId;
+               
                String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
                                "clusterhostid");
                if (null == lhostId) {
@@ -481,16 +481,16 @@ public class EventsServiceImpl implements EventsService {
                // start processing, building a batch to push to the backend
                final long startMs = System.currentTimeMillis();
                long count = 0;
-               long maxEventBatch = 1024 * 16;
+               long maxEventBatch = 1024L* 16;
                String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
                if (null != batchlen)
                        maxEventBatch = Long.parseLong(batchlen);
                // long maxEventBatch =
                // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
-               final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
+               final LinkedList<Publisher.message> batch = new LinkedList<>();
                // final ArrayList<KeyedMessage<String, String>> kms = new
                // ArrayList<KeyedMessage<String, String>>();
-               final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
+               final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
                try {
                        // for each message...
                        Publisher.message m = null;
@@ -592,7 +592,7 @@ public class EventsServiceImpl implements EventsService {
                // start processing, building a batch to push to the backend
                final long startMs = System.currentTimeMillis();
                long count = 0;
-               long maxEventBatch = 1024 * 16;
+               long maxEventBatch = 1024L * 16;
                String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
                if (null != evenlen)
                        maxEventBatch = Long.parseLong(evenlen);
index 83b3770..d867ea8 100644 (file)
@@ -49,7 +49,7 @@ import com.att.nsa.metrics.CdmMeasuredItem;
 @Component
 public class MetricsServiceImpl implements MetricsService {
 
-       //private static final Logger LOG = Logger.getLogger(MetricsService.class.toString());
+       
        private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MetricsService.class);
        /**
         * 
index 01ed1cc..7e9d783 100644 (file)
@@ -45,7 +45,7 @@ import com.att.dmf.mr.exception.DMaaPResponseCode;
 import com.att.dmf.mr.exception.ErrorResponse;
 import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
 import com.att.dmf.mr.metabroker.Broker1;
-//import com.att.dmf.mr.metabroker.Broker1;
+
 import com.att.dmf.mr.metabroker.Topic;
 import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
 import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
@@ -67,13 +67,13 @@ import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
 public class TopicServiceImpl implements TopicService {
 
        // private static final Logger LOGGER =
-       // Logger.getLogger(TopicServiceImpl.class);
+       
        private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
        @Autowired
        private DMaaPErrorMessages errorMessages;
 
        // @Value("${msgRtr.topicfactory.aaf}")
-       // private String mrFactory;
+       
 
        public DMaaPErrorMessages getErrorMessages() {
                return errorMessages;
@@ -125,7 +125,7 @@ public class TopicServiceImpl implements TopicService {
                for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
                        JSONObject obj = new JSONObject();
                        obj.put("topicName", topic.getName());
-                       // obj.put("description", topic.getDescription());
+                       
                        obj.put("owner", topic.getOwner());
                        obj.put("txenabled", topic.isTransactionEnabled());
                        topicsList.put(obj);
@@ -193,7 +193,7 @@ public class TopicServiceImpl implements TopicService {
 
                final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
                String key = null;
-               //String appName = dmaapContext.getRequest().getHeader("AppName");
+               
                String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
                                "enforced.topic.name.AAF");
 
@@ -209,55 +209,55 @@ public class TopicServiceImpl implements TopicService {
                                                "Failed to create topic: Access Denied.User does not have permission to perform create topic");
 
                                LOGGER.info(errRes.toString());
-                               // throw new DMaaPAccessDeniedException(errRes);
+                               
 
                        }
                }
-               // else if (user==null &&
+       
                // (null==dmaapContext.getRequest().getHeader("Authorization") && null
-               // == dmaapContext.getRequest().getHeader("cookie")) ) {
-               /*else if (user == null && null == dmaapContext.getRequest().getHeader("Authorization")
-                               ) {
-                       LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
+               
+               
+                               
+                       
 
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                       "Failed to create topic: Access Denied.User does not have permission to perform create topic");
+                       
+                                       
+                                       
 
-                       LOGGER.info(errRes.toString());
-                       // throw new DMaaPAccessDeniedException(errRes);
-               }*/
+                       
+                       
+       
 
                if (user == null /*&& (null != dmaapContext.getRequest().getHeader("Authorization")
                                )*/) {
-                       // if (user == null &&
+                       
                        // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
-                       // null != dmaapContext.getRequest().getHeader("cookie"))) {
+                       
                        // ACL authentication is not provided so we will use the aaf
                        // authentication
-                       /*LOGGER.info("Authorization the topic");
+                       
 
-                       String permission = "";
-                       String nameSpace = "";
-                       if (topicBean.getTopicName().indexOf(".") > 1)
-                               nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
+                       
+                       
+                       
+                               
 
-                       String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                                       "msgRtr.topicfactory.aaf");
+                       
+                                       
 
-                       // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
+                       
 
-                       permission = mrFactoryVal + nameSpace + "|create";
-                       DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();*/
+                       
+                       
 
-                       //if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
+                       
                        if (false) {
                                LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
 
                                ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
                                                DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
                                                "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
-                                                               //+ permission);
+                                                               
                                                + "permission");
                                                
 
@@ -267,14 +267,14 @@ public class TopicServiceImpl implements TopicService {
                        } else {
                                // if user is null and aaf authentication is ok then key should
                                // be ""
-                               // key = "";
+                               
                                /**
                                 * Added as part of AAF user it should return username
                                 */
 
-                               //key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
-                               key="admin";
-                               LOGGER.info("key ==================== " + key);
+                               
+                               
+                               //LOGGER.info("key ==================== " + key);
 
                        }
                }
@@ -283,7 +283,7 @@ public class TopicServiceImpl implements TopicService {
                        final String topicName = topicBean.getTopicName();
                        final String desc = topicBean.getTopicDescription();
                        int partition = topicBean.getPartitionCount();
-                       // int replica = topicBean.getReplicationCount();
+                       
                        if (partition == 0) {
                                partition = 8;
                        }
@@ -291,7 +291,7 @@ public class TopicServiceImpl implements TopicService {
 
                        int replica = topicBean.getReplicationCount();
                        if (replica == 0) {
-                               //replica = 3;
+                               
                                replica = 1;
                        }
                        final int replicas = replica;
@@ -319,7 +319,7 @@ public class TopicServiceImpl implements TopicService {
                        throw new CambriaApiException(errRes);
                } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) {
                        // TODO Auto-generated catch block
-                       e.printStackTrace();
+                       LOGGER.error("Exception is at createTopic(  ) ", e);
                }
        }
 
@@ -503,25 +503,25 @@ public class TopicServiceImpl implements TopicService {
                LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
                final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
 
-               // if (user == null) {
+               
                //
                // LOGGER.info("Authenticating the user, as ACL authentication is not
-               // provided");
+               
                //// String permission =
-               // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+               
                //
-               // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-               // String permission = aaf.aafPermissionString(topicName, "manage");
+               
+               
                // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
                // {
                // LOGGER.error("Failed to permit write access to producer [" +
                // producerId + "] for topic " + topicName
-               // + ". Authentication failed.");
+               
                // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
                // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
                // errorMessages.getNotPermitted1()+" <Grant publish permissions>
-               // "+errorMessages.getNotPermitted2()+ topicName);
-               // LOGGER.info(errRes);
+               
+               
                // throw new DMaaPAccessDeniedException(errRes);
                // }
                // }
@@ -561,25 +561,25 @@ public class TopicServiceImpl implements TopicService {
 
                LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
                final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
-               // if (user == null) {
+               
                //
                //// String permission =
-               // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+               
                // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
                // String permission = aaf.aafPermissionString(topicName, "manage");
                // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
                // {
                // LOGGER.error("Failed to revoke write access to producer [" +
                // producerId + "] for topic " + topicName
-               // + ". Authentication failed.");
+               
                // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
                // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
                // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
-               // "+errorMessages.getNotPermitted2()+ topicName);
-               // LOGGER.info(errRes);
+               
+               
                // throw new DMaaPAccessDeniedException(errRes);
                //
-               // }
+       
                // }
 
                Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
@@ -612,22 +612,22 @@ public class TopicServiceImpl implements TopicService {
 
                LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
                final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
-               // if (user == null) {
+               
                //
                //// String permission =
-               // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
-               // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+               
+               
                // String permission = aaf.aafPermissionString(topicName, "manage");
                // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
                // {
                // LOGGER.error("Failed to permit read access to consumer [" +
                // consumerId + "] for topic " + topicName
-               // + ". Authentication failed.");
+               
                // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
                // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
                // errorMessages.getNotPermitted1()+" <Grant consume permissions>
-               // "+errorMessages.getNotPermitted2()+ topicName);
-               // LOGGER.info(errRes);
+               
+               
                // throw new DMaaPAccessDeniedException(errRes);
                // }
                // }
@@ -662,27 +662,26 @@ public class TopicServiceImpl implements TopicService {
 
                LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
                final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
-               // if (user == null) {
+               
                //// String permission =
-               // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+               
                // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
                // String permission = aaf.aafPermissionString(topicName, "manage");
                // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
                // {
                // LOGGER.error("Failed to revoke read access to consumer [" +
                // consumerId + "] for topic " + topicName
-               // + ". Authentication failed.");
+               
                // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
                // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
                // errorMessages.getNotPermitted1()+" <Grant consume permissions>
-               // "+errorMessages.getNotPermitted2()+ topicName);
+               
                // LOGGER.info(errRes);
                // throw new DMaaPAccessDeniedException(errRes);
                // }
                //
                //
-               // }
-
+       
                Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
 
                if (null == topic) {
index ae2d863..3065928 100644 (file)
@@ -52,7 +52,7 @@ public class TransactionServiceImpl implements TransactionService {
                        throws ConfigDbException, IOException {
 
                /*
-                * ConfigurationReader configReader = dmaapContext.getConfigReader();
+               
                 * 
                 * LOG.info("configReader : "+configReader.toString());
                 * 
@@ -77,7 +77,7 @@ public class TransactionServiceImpl implements TransactionService {
                        IOException {
 
                /*
-                * if (null != transactionId) {
+                
                 * 
                 * ConfigurationReader configReader = dmaapContext.getConfigReader();
                 * 
index 33bc2f4..c8bb073 100644 (file)
@@ -27,7 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-//import kafka.common.TopicExistsException;
+
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.json.JSONArray;
 import org.json.JSONObject;
@@ -50,7 +50,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey;
 @Service
 public class UIServiceImpl implements UIService {
 
-       //private static final Logger LOGGER = Logger.getLogger(UIServiceImpl.class);
+       
        private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(UIServiceImpl.class);
        /**
         * Returning template of hello page
index dd1e4eb..fdf2d28 100644 (file)
@@ -55,7 +55,7 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
 import com.att.nsa.limits.Blacklist;
 import com.att.nsa.security.NsaAuthenticatorService;
-//import com.att.nsa.security.authenticators.OriginalUebAuthenticator;
+
 import com.att.nsa.security.db.BaseNsaApiDbImpl;
 import com.att.nsa.security.db.NsaApiDb;
 import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
@@ -70,7 +70,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
 @Component
 public class ConfigurationReader {
 
-//     private rrNvReadable settings;
+
        private Broker1 fMetaBroker;
        private ConsumerFactory fConsumerFactory;
        private Publisher fPublisher;
@@ -78,7 +78,7 @@ public class ConfigurationReader {
        @Autowired
        private DMaaPCambriaLimiter fRateLimiter;
        private NsaApiDb<NsaSimpleApiKey> fApiKeyDb;
-       /* private DMaaPTransactionObjDB<DMaaPTransactionObj> fTranDb; */
+       
        private DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager;
        private NsaAuthenticatorService<NsaSimpleApiKey> nsaSecurityManager;
        private static CuratorFramework curator;
@@ -90,7 +90,7 @@ public class ConfigurationReader {
        private Emailer fEmailer;
 
        private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
-       //private static final Logger log = Logger.getLogger(ConfigurationReader.class.toString());
+       
 
        /**
         * constructor to initialize all the values
@@ -129,7 +129,7 @@ public class ConfigurationReader {
                        @Qualifier("dMaaPAuthenticatorImpl") DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager
                        )
                                        throws missingReqdSetting, invalidSettingValue, ServletException, KafkaConsumerCacheException, ConfigDbException {
-               //this.settings = settings;
+               
                this.fMetrics = fMetrics;
                this.zk = zk;
                this.fConfigDb = fConfigDb;
@@ -137,18 +137,18 @@ public class ConfigurationReader {
                ConfigurationReader.curator = curator;
                this.fConsumerFactory = fConsumerFactory;
                this.fMetaBroker = fMetaBroker;
-               //System.out.println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSs  " + fMetaBroker);
+               
                this.q = q;
                this.mmb = mmb;
                this.fApiKeyDb = fApiKeyDb;
-               /* this.fTranDb = fTranDb; */
+               
                this.fSecurityManager = fSecurityManager;
                
                long allowedtimeSkewMs=600000L;
                String strallowedTimeSkewM= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"authentication.allowedTimeSkewMs");
                if(null!=strallowedTimeSkewM)allowedtimeSkewMs= Long.parseLong(strallowedTimeSkewM);
                                
-       //      boolean requireSecureChannel = true;
+       
                //String strrequireSecureChannel= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"aauthentication.requireSecureChannel");
                //if(strrequireSecureChannel!=null)requireSecureChannel=Boolean.parseBoolean(strrequireSecureChannel);
                //this.nsaSecurityManager = new NsaAuthenticatorService<NsaSimpleApiKey>(this.fApiKeyDb, settings.getLong("authentication.allowedTimeSkewMs", 600000L), settings.getBoolean("authentication.requireSecureChannel", true));
index 4c38d57..214aac8 100644 (file)
@@ -130,10 +130,10 @@ public class DMaaPResponseBuilder {
         */
        public static void respondOkWithStream(DMaaPContext ctx, String mediaType, StreamWriter writer) throws IOException {
                ctx.getResponse().setStatus(200);
-               OutputStream os = getStreamForBinaryResponse(ctx, mediaType);
-               writer.write(os);
-               os.close();
-               
+               try(OutputStream os = getStreamForBinaryResponse(ctx, mediaType)) {
+                       writer.write(os);
+               }
+
                
        }
 
@@ -218,7 +218,7 @@ public class DMaaPResponseBuilder {
        /**
         * interface used to define write method for outputStream
         */
-       public static abstract interface StreamWriter {
+       public abstract static interface StreamWriter {
                /**
                 * abstract method used to write the response
                 * 
@@ -252,27 +252,20 @@ public class DMaaPResponseBuilder {
                
 
                boolean fResponseEntityAllowed = (!(ctx.getRequest().getMethod().equalsIgnoreCase("HEAD")));
-
-               OutputStream os = null;
-               try{
+               
                if (fResponseEntityAllowed) {
-                       os = ctx.getResponse().getOutputStream();
-                       return os;
+                       try(OutputStream os = ctx.getResponse().getOutputStream()){
+                               return os;
+                       }catch (Exception e){
+                               log.error("Exception in getStreamForBinaryResponse",e);
+                               throw new IOException();
+                       }
                } else {
-                       os = new NullStream();
-                       return os;
-               }
-               }catch (Exception e){
-                       throw new IOException();
-                       
-               }
-               finally{
-                       if(null != os){
-                               try{
-                                       os.close();
-                               }catch(Exception e) {
-                                        
-                               }
+                       try(OutputStream os = new NullStream()){
+                               return os;
+                       }catch (Exception e){
+                               log.error("Exception in getStreamForBinaryResponse",e);
+                               throw new IOException();
                        }
                }
        }
index 58c9fc9..000869e 100644 (file)
@@ -39,9 +39,9 @@ public class PropertyReader extends nvReadableStack {
         * initializing logger
         * 
         */
-       //private static final Logger LOGGER = Logger.getLogger(PropertyReader.class);
+       
        private static final EELFLogger log = EELFManager.getInstance().getLogger(PropertyReader.class);
-//     private static final String MSGRTR_PROPERTIES_FILE = "msgRtrApi.properties";
+
 
        /**
         * constructor initialization
@@ -50,11 +50,11 @@ public class PropertyReader extends nvReadableStack {
         * 
         */
        public PropertyReader() throws loadException {
-       /*      Map<String, String> argMap = new HashMap<String, String>();
-               final String config = getSetting(argMap, CambriaConstants.kConfig, MSGRTR_PROPERTIES_FILE);
-               final URL settingStream = findStream(config, ConfigurationReader.class);
-               push(new nvPropertiesFile(settingStream));
-               push(new nvReadableTable(argMap));*/
+       
+               
+               
+       
+               
        }
 
        /**
@@ -83,43 +83,43 @@ public class PropertyReader extends nvReadableStack {
         * @exception MalformedURLException
         * 
         */
-       /*public static URL findStream(final String resourceName, Class<?> clazz) {
-               try {
-                       File file = new File(resourceName);
+       
+               
+               
 
-                       if (file.isAbsolute()) {
-                               return file.toURI().toURL();
-                       }
+                       
+                       
+               
 
-                       String filesRoot = System.getProperty("RRWT_FILES", null);
+               
 
-                       if (null != filesRoot) {
+                       
 
-                               String fullPath = filesRoot + "/" + resourceName;
+                               
 
-                               LOGGER.debug("Looking for [" + fullPath + "].");
+                               
 
-                               file = new File(fullPath);
-                               if (file.exists()) {
-                                       return file.toURI().toURL();
-                               }
-                       }
+                               
+                       
+                                       
+                       
+               
 
-                       URL res = clazz.getClassLoader().getResource(resourceName);
+                       
 
-                       if (null != res) {
-                               return res;
-                       }
+                       
+                               
+               
 
-                       res = ClassLoader.getSystemResource(resourceName);
+                       
+
+                       
+                               
+               
+               
+                       
+       
+               
+       
 
-                       if (null != res) {
-                               return res;
-                       }
-               } catch (MalformedURLException e) {
-                       LOGGER.error("Unexpected failure to convert a local filename into a URL: " + e.getMessage(), e);
-               }
-               return null;
-       }
-*/
 }
index 08380fb..0e2804e 100644 (file)
@@ -86,7 +86,7 @@ public class DMaaPMetricsSender implements Runnable {
                        String Setting_CambriaTopic=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_CambriaTopic);
                        if(Setting_CambriaTopic==null) Setting_CambriaTopic = "msgrtr.apinode.metrics.dmaap";     
                        
-       //              Setting_CambriaBaseUrl=Setting_CambriaBaseUrl==null?defaultTopic:Setting_CambriaBaseUrl;
+       
                        
                        String Setting_CambriaSendFreqSecs=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_CambriaSendFreqSecs);
                        
@@ -179,7 +179,7 @@ public class DMaaPMetricsSender implements Runnable {
        private final CambriaPublisher fCambria;
        private final String fHostname;
 
-       //private static final Logger log = LoggerFactory.getLogger(MetricsSender.class);
+       
 
        private static final EELFLogger log = EELFManager.getInstance().getLogger(MetricsSender.class);
        /**
index b99f9e6..26f58e0 100644 (file)
@@ -52,7 +52,7 @@ public class ContentLengthFilter implements Filter {
 
        private FilterConfig filterConfig = null;
        DMaaPErrorMessages errorMessages = null;
-       //private Logger log = Logger.getLogger(ContentLengthFilter.class.toString());
+       
        private static final EELFLogger log = EELFManager.getInstance().getLogger(ContentLengthFilter.class);
        /**
         * Default constructor.
@@ -110,7 +110,7 @@ public class ContentLengthFilter implements Filter {
                                        DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(), errorMessages.getMsgSizeExceeds()
                                                        + jsonObj.toString());
                        log.info(errRes.toString());
-                       // throw new CambriaApiException(errRes);
+                       
                }
 
        }
index 64e128c..a12e96c 100644 (file)
@@ -24,23 +24,32 @@ import java.util.concurrent.TimeUnit;
 \r
 import org.apache.curator.CuratorZookeeperClient;\r
 import org.apache.curator.framework.CuratorFramework;\r
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;\r
 import org.apache.curator.framework.api.CreateBuilder;\r
 import org.apache.curator.framework.api.CuratorListener;\r
 import org.apache.curator.framework.api.DeleteBuilder;\r
 import org.apache.curator.framework.api.ExistsBuilder;\r
 import org.apache.curator.framework.api.GetACLBuilder;\r
 import org.apache.curator.framework.api.GetChildrenBuilder;\r
+import org.apache.curator.framework.api.GetConfigBuilder;\r
 import org.apache.curator.framework.api.GetDataBuilder;\r
+import org.apache.curator.framework.api.ReconfigBuilder;\r
+import org.apache.curator.framework.api.RemoveWatchesBuilder;\r
 import org.apache.curator.framework.api.SetACLBuilder;\r
 import org.apache.curator.framework.api.SetDataBuilder;\r
 import org.apache.curator.framework.api.SyncBuilder;\r
 import org.apache.curator.framework.api.UnhandledErrorListener;\r
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;\r
 import org.apache.curator.framework.api.transaction.CuratorTransaction;\r
+import org.apache.curator.framework.api.transaction.TransactionOp;\r
 import org.apache.curator.framework.imps.CuratorFrameworkState;\r
 import org.apache.curator.framework.listen.Listenable;\r
+import org.apache.curator.framework.schema.SchemaSet;\r
+import org.apache.curator.framework.state.ConnectionStateErrorPolicy;\r
 import org.apache.curator.framework.state.ConnectionStateListener;\r
 import org.apache.curator.utils.EnsurePath;\r
 import org.apache.zookeeper.Watcher;\r
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;\r
 \r
 public class CuratorFrameworkImpl implements CuratorFramework {\r
 \r
@@ -200,4 +209,70 @@ public class CuratorFrameworkImpl implements CuratorFramework {
                return null;\r
        }\r
 \r
+       @Override\r
+       public ReconfigBuilder reconfig() {\r
+               // TODO Auto-generated method stub\r
+               return null;\r
+       }\r
+\r
+       @Override\r
+       public GetConfigBuilder getConfig() {\r
+               // TODO Auto-generated method stub\r
+               return null;\r
+       }\r
+\r
+       @Override\r
+       public CuratorMultiTransaction transaction() {\r
+               // TODO Auto-generated method stub\r
+               return null;\r
+       }\r
+\r
+       @Override\r
+       public TransactionOp transactionOp() {\r
+               // TODO Auto-generated method stub\r
+               return null;\r
+       }\r
+\r
+       @Override\r
+       public void createContainers(String path) throws Exception {\r
+               // TODO Auto-generated method stub\r
+               \r
+       }\r
+\r
+       @Override\r
+       public RemoveWatchesBuilder watches() {\r
+               // TODO Auto-generated method stub\r
+               return null;\r
+       }\r
+\r
+       @Override\r
+       public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() {\r
+               // TODO Auto-generated method stub\r
+               return null;\r
+       }\r
+\r
+       @Override\r
+       public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() {\r
+               // TODO Auto-generated method stub\r
+               return null;\r
+       }\r
+\r
+       @Override\r
+       public QuorumVerifier getCurrentConfig() {\r
+               // TODO Auto-generated method stub\r
+               return null;\r
+       }\r
+\r
+       @Override\r
+       public SchemaSet getSchemaSet() {\r
+               // TODO Auto-generated method stub\r
+               return null;\r
+       }\r
+\r
+       @Override\r
+       public boolean isZk34CompatibilityMode() {\r
+               // TODO Auto-generated method stub\r
+               return false;\r
+       }\r
+\r
 }\r
index 7809677..61001b0 100644 (file)
@@ -27,7 +27,7 @@
 
 major=1
 minor=1
-patch=5
+patch=7
 
 base_version=${major}.${minor}.${patch}