Merge "bumped the pom version"
authorRam Koya <rk541m@att.com>
Mon, 27 Aug 2018 14:59:23 +0000 (14:59 +0000)
committerGerrit Code Review <gerrit@onap.org>
Mon, 27 Aug 2018 14:59:23 +0000 (14:59 +0000)
src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java
src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java
src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java

index dc4bcd5..735e372 100644 (file)
@@ -42,10 +42,6 @@ import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
 
 
-//import kafka.javaapi.producer.Producer;
-//import kafka.producer.KeyedMessage;
-//import kafka.producer.ProducerConfig;
-//import kafka.producer.KeyedMessage;
 
 /**
  * Sends raw JSON objects into Kafka.
@@ -77,8 +73,7 @@ public class KafkaPublisher implements Publisher {
                        kafkaConnUrl="localhost:9092";
                }
                
-        //     props.put("bootstrap.servers", bootSever);
-       //System.setProperty("java.security.auth.login.config",jaaspath);
+       
        
                
                transferSetting( props, "bootstrap.servers",kafkaConnUrl);
@@ -93,7 +88,7 @@ public class KafkaPublisher implements Publisher {
                 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
                
-               //fProducer = new Producer<String, String>(fConfig);
+               
                fProducer = new KafkaProducer<>(props);
        }
 
@@ -178,10 +173,10 @@ public class KafkaPublisher implements Publisher {
                        throws IOException {
                log.info("sending " + msgs.size() + " events to [" + topic + "]");
 try{
-               final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size());
+               final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size());
                        for (message o : msgs) {
                        
-                       final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString());
+                       final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString());
                        
                
                try {
index 22f0588..8c841d4 100644 (file)
@@ -46,9 +46,9 @@ public class MemoryMetaBroker implements Broker {
         * @param settings
         */
        public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) {
-       //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) {
+       
                fQueue = mq;
-               fTopics = new HashMap<String, MemTopic>();
+               fTopics = new HashMap<>();
        }
 
        @Override
@@ -190,7 +190,7 @@ public class MemoryMetaBroker implements Broker {
 
                @Override
                public Set<String> getOwners() {
-                       final TreeSet<String> set = new TreeSet<String> ();
+                       final TreeSet<String> set = new TreeSet<> ();
                        set.add ( fOwner );
                        return set;
                }
index 5f28367..f0bb982 100644 (file)
@@ -65,7 +65,7 @@ public class DMaaPCambriaLimiter {
        public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
                        throws missingReqdSetting, invalidSettingValue {
                        fRateInfo = new HashMap<String, RateInfo>();
-               fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+               fRateInfoCheck = new HashMap<>();
                fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
                                CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
                fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
@@ -112,7 +112,7 @@ public class DMaaPCambriaLimiter {
         */
        public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
                fRateInfo = new HashMap<String, RateInfo>();
-               fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+               fRateInfoCheck = new HashMap<>();
                fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
                fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
                fWindowLengthMins = windowLengthMins;
index 6fc0838..e4e09c8 100644 (file)
@@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
 import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
 import com.att.dmf.mr.constants.CambriaConstants;
 import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
@@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
  */
 public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
-       // private static final Logger log = LoggerFactory
-       // .getLogger(DMaaPKafkaConsumerFactory.class);
+       
        private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
-       // @Autowired
-       // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new
-       // KafkaLiveLockAvoider();
+       
 
        /**
         * constructor initialization
@@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
 
                final boolean isCacheEnabled = kSetting_EnableCache;
 
-               // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
-               // metrics) : null;
+               
                fCache = null;
                if (isCacheEnabled) {
                        fCache = KafkaConsumerCache.getInstance();
@@ -195,8 +191,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
                                        final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
                                        long fCreateTimeMs = System.currentTimeMillis();
                                        KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
-                                       kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj()
-                                                                                                                                                                                                                               // );
+                                       kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
                                        log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
 
                                        if (fCache != null) {
@@ -265,10 +260,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
        private void transferSettingIfProvided(Properties target, String key, String prefix) {
                String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
 
-               // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
+               
                if (null != keyVal) {
-                       // final String val = fSettings
-                       // .getString(makeLongKey(key, prefix), "");
+               
                        log.info("Setting [" + key + "] to " + keyVal + ".");
                        target.put(key, keyVal);
                }
@@ -294,10 +288,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
                props.put("group.id", fakeGroupName);
                props.put("enable.auto.commit", "false"); // 0.11
                props.put("bootstrap.servers", fkafkaBrokers);
-               /*props.put("sasl.jaas.config",
-                               "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
-               props.put("security.protocol", "SASL_PLAINTEXT");
-               props.put("sasl.mechanism", "PLAIN");*/
+
+               
                props.put("client.id", consumerId);
 
                // additional settings: start with our defaults, then pull in configured
index e29403f..963ff2d 100644 (file)
@@ -23,7 +23,7 @@ package com.att.dmf.mr.beans;
 
 import java.security.Key;
 
-//import org.apache.log4-j.Logger;
+
 import org.springframework.beans.factory.annotation.Autowired;
 
 import com.att.dmf.mr.constants.CambriaConstants;
@@ -48,11 +48,11 @@ import com.att.nsa.util.rrConvertor;
  */
 public class DMaaPNsaApiDb {
        
-       //private rrNvReadable settings;
+       
        private DMaaPZkConfigDb cdb;
        
        //private static final Logger log = Logger
-               //      .getLogger(DMaaPNsaApiDb.class.toString());
+               
        private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class);
        
 /**
@@ -63,7 +63,7 @@ public class DMaaPNsaApiDb {
  */
        @Autowired
        public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) {
-               //this.setSettings(settings);
+               
                this.setCdb(cdb);
        }
        /**
@@ -79,16 +79,16 @@ public class DMaaPNsaApiDb {
                        missingReqdSetting {
                // Cambria uses an encrypted api key db
 
-               //final String keyBase64 = settings.getString("cambria.secureConfig.key",                       null);
+               
                final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key");
                
                
-       //      final String initVectorBase64 = settings.getString(                             "cambria.secureConfig.iv", null);
+       
        final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv");
                // if neither value was provided, don't encrypt api key db
                if (keyBase64 == null && initVectorBase64 == null) {
                        log.info("This server is configured to use an unencrypted API key database. See the settings documentation.");
-                       return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb,
+                       return new BaseNsaApiDbImpl<>(cdb,
                                        new NsaSimpleApiKeyFactory());
                } else if (keyBase64 == null) {
                        // neither or both, otherwise something's goofed
@@ -100,7 +100,7 @@ public class DMaaPNsaApiDb {
                        log.info("This server is configured to use an encrypted API key database.");
                        final Key key = EncryptingLayer.readSecretKey(keyBase64);
                        final byte[] iv = rrConvertor.base64Decode(initVectorBase64);
-                       return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb,
+                       return new EncryptingApiDbImpl<>(cdb,
                                        new NsaSimpleApiKeyFactory(), key, iv);
                }
        }
@@ -109,17 +109,17 @@ public class DMaaPNsaApiDb {
         * @return
         * returns settings
         */
-/*     public rrNvReadable getSettings() {
-               return settings;
-       }*/
+
+               
+       
 
        /**
         * @param settings
         * set settings
         */
-       /*public void setSettings(rrNvReadable settings) {
-               this.settings = settings;
-       }*/
+       
+               
+       
 
         /**
         * @return
index d543721..5aa25fa 100644 (file)
@@ -26,7 +26,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
 import com.att.dmf.mr.utils.ConfigurationReader;
 import com.att.nsa.configs.confimpl.ZkConfigDb;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
-//import com.att.nsa.configs.confimpl.ZkConfigDb;
+
 /**
  * Provide the zookeeper config db connection 
  * @author nilanjana.maity
@@ -42,7 +42,7 @@ public class DMaaPZkConfigDb extends ZkConfigDb {
        public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk,
                        @Qualifier("propertyReader") rrNvReadable settings) {
                
-               //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot));
+               
                super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot());
                
        }
index 7f27798..f61b6ea 100644 (file)
@@ -51,7 +51,7 @@ public class DME2EndPointLoader {
        private String protocol;
        private String serviceURL;
        private static DME2EndPointLoader loader = new DME2EndPointLoader();
-//     private static final Logger LOG = LoggerFactory.getLogger(EventsServiceImpl.class);
+
        private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
        private DME2EndPointLoader() {
        }
index 0993aa6..4b219b1 100644 (file)
  *******************************************************************************/
 package com.att.dmf.mr.metrics.publisher;
 
-//import org.slf4j.Logger;
+
 
 //
 import com.att.eelf.configuration.EELFLogger;
-//import com.att.eelf.configuration.EELFManager;
+
 
 /**
  * 
index 1510c32..46dfa99 100644 (file)
@@ -95,7 +95,7 @@ public class CambriaPublisherUtility
         */
        public static List<HttpHost> createHostsList(Collection<String> hosts)
        {
-               final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> ();
+               final ArrayList<HttpHost> convertedHosts = new ArrayList<>();
                for ( String host : hosts )
                {
                        if ( host.length () == 0 ) continue;