Sonar fixes in "appc-dmaap-adapter-bundle" 73/31973/3
authorwejs <maciej.wejs@nokia.com>
Fri, 16 Feb 2018 15:08:04 +0000 (16:08 +0100)
committerPatrick Brady <pb071s@att.com>
Sun, 18 Feb 2018 21:30:58 +0000 (21:30 +0000)
Also CheckStyle fixes.
No major refactoring.

Change-Id: Icf5e7221ecee2611f77bb3212c24d1fad9a27b96
Issue-ID: APPC-646
Signed-off-by: wejs <maciej.wejs@nokia.com>
appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapConsumerImpl.java
appc-adapters/appc-dmaap-adapter/appc-dmaap-adapter-bundle/src/main/java/org/onap/appc/adapter/messaging/dmaap/impl/DmaapProducerImpl.java

index 155f34b..6f907ae 100644 (file)
 
 package org.onap.appc.adapter.messaging.dmaap.impl;
 
-import java.io.IOException;
-import java.util.*;
-
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-//import com.att.nsa.cambria.client.CambriaClientBuilders;
-//import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-//import com.att.nsa.cambria.client.CambriaConsumer;
-
 import com.att.nsa.mr.client.MRClientFactory;
 import com.att.nsa.mr.client.MRConsumer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
 import org.apache.commons.lang3.StringUtils;
 import org.onap.appc.adapter.message.Consumer;
 import org.onap.appc.configuration.Configuration;
 import org.onap.appc.configuration.ConfigurationFactory;
 import org.onap.appc.metricservice.MetricRegistry;
 import org.onap.appc.metricservice.MetricService;
-import org.onap.appc.metricservice.impl.MetricServiceImpl;
+import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
 import org.onap.appc.metricservice.metric.Metric;
 import org.onap.appc.metricservice.metric.MetricType;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
 import org.onap.appc.metricservice.policy.PublishingPolicy;
 import org.onap.appc.metricservice.publisher.LogPublisher;
 import org.osgi.framework.BundleContext;
@@ -53,74 +50,81 @@ import org.osgi.framework.ServiceReference;
 
 public class DmaapConsumerImpl implements Consumer {
 
-    private final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
+    private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
     private final Configuration configuration = ConfigurationFactory.getConfiguration();
     // Default values
     private static final int DEFAULT_TIMEOUT_MS = 60000;
     private static final int DEFAULT_LIMIT = 1000;
-    private static MetricRegistry metricRegistry;
     private String topic;
-    private boolean isMetricEnabled=false;
+    private boolean isMetricEnabled = false;
     private boolean useHttps = false;
+    private MetricRegistry metricRegistry;
     private MRConsumer client = null;
     private Properties props = null;
 
 
-    public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password) {
-        this(urls, topicName, consumerGroupName, consumerId,user, password,null);
+    public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
+        String user, String password) {
 
+        this(urls, topicName, consumerGroupName, consumerId,user, password,null);
     }
 
-    public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) {
+    public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
+        String user, String password, String filter) {
+
         this.topic = topicName;
         this.props = new Properties();
         String urlsStr = StringUtils.join(urls, ',');
-        props.setProperty("host",urlsStr);
-        props.setProperty("group",consumerGroupName);
-        props.setProperty("id",consumerId);
-        props.setProperty("username",user);
-        props.setProperty("password",password);
-        if(filter != null) {
+        props.setProperty("host", urlsStr);
+        props.setProperty("group", consumerGroupName);
+        props.setProperty("id", consumerId);
+        props.setProperty("username", user);
+        props.setProperty("password", password);
+        if (filter != null) {
             props.setProperty("filter", filter);
         }
     }
 
-
     private void initMetric() {
         LOG.debug("Metric getting initialized");
         MetricService metricService = getMetricservice();
-        metricRegistry = metricService.createRegistry("APPC");
-        DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory().
-                dmaapRequestCounterBuilder().
-                withName("DMAAP_KPI").withType(MetricType.COUNTER).
-                withRecievedMessage(0)
+        if (metricService != null) {
+            metricRegistry = metricService.createRegistry("APPC");
+
+            DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
+                .dmaapRequestCounterBuilder()
+                .withName("DMAAP_KPI").withType(MetricType.COUNTER)
+                .withRecievedMessage(0)
                 .withPublishedMessage(0)
                 .build();
-        if (metricRegistry.register(dmaapKpiMetric)) {
-            Metric[] metrics = new Metric[]{dmaapKpiMetric};
-            LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
-            LogPublisher[] logPublishers = new LogPublisher[1];
-            logPublishers[0] = logPublisher;
-            PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory().
-                    scheduledPolicyBuilder().withPublishers(logPublishers).
-                    withMetrics(metrics).
-                    build();
-            LOG.debug("Policy getting initialized");
-            manuallyScheduledPublishingPolicy.init();
-            LOG.debug("Metric initialized");
+
+            if (metricRegistry.register(dmaapKpiMetric)) {
+                Metric[] metrics = new Metric[]{dmaapKpiMetric};
+                LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
+                LogPublisher[] logPublishers = new LogPublisher[1];
+                logPublishers[0] = logPublisher;
+
+                PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
+                    .scheduledPolicyBuilder().withPublishers(logPublishers)
+                    .withMetrics(metrics)
+                    .build();
+
+                LOG.debug("Policy getting initialized");
+                manuallyScheduledPublishingPolicy.init();
+                LOG.debug("Metric initialized");
+            }
         }
     }
-    
 
     /**
-     * @return An instance of MRConsumer created from our class variables
+     * @return An instance of MRConsumer created from our class variables.
      */
     private synchronized MRConsumer getClient(int waitMs, int limit) {
         try {
             props.setProperty("timeout",String.valueOf(waitMs));
             props.setProperty("limit",String.valueOf(limit));
             String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props);
-            return MRClientFactory.createConsumer ( topicProducerPropFileName);
+            return MRClientFactory.createConsumer(topicProducerPropFileName);
         } catch (IOException e1) {
             LOG.error("failed to createConsumer",e1);
             return null;
@@ -135,13 +139,18 @@ public class DmaapConsumerImpl implements Consumer {
         client = null;
     }
 
+    @Override
+    public List<String> fetch() {
+        return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
+    }
+
     @Override
     public List<String> fetch(int waitMs, int limit) {
-        Properties properties=configuration.getProperties();
-        if(properties!=null && properties.getProperty("metric.enabled")!=null ){
-          isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled"));
+        Properties properties = configuration.getProperties();
+        if (properties != null && properties.getProperty("metric.enabled") != null) {
+            isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
         }
-        if(isMetricEnabled){
+        if (isMetricEnabled) {
             initMetric();
         }
         LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
@@ -153,32 +162,36 @@ public class DmaapConsumerImpl implements Consumer {
             LOG.info("Getting DMaaP Client ...");
             client = getClient(waitMs, limit);
         }
-        try {
-            for (String s : client.fetch(waitMs, limit)) {
-                out.add(s);
-                if(isMetricEnabled){
-                    ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage();
-                }
-            }
-            LOG.debug(String.format("Got %d records from %s", out.size(), this.toString()));
-        } catch (Exception e) {
-            // Connection exception
-            LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()));
-            e.printStackTrace();
+        if (client != null) {
             try {
-                LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
-                Thread.sleep(waitMs);
-            } catch (InterruptedException e2) {
-                LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
+                for (String s : client.fetch(waitMs, limit)) {
+                    out.add(s);
+                    incrementReceivedMessage();
+                }
+                LOG.debug(String.format("Got %d records from %s", out.size(), this.toString()));
+            } catch (Exception e) {
+                // Connection exception
+                LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()), e);
+                try {
+                    LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
+                    Thread.sleep(waitMs);
+                } catch (InterruptedException e2) {
+                    LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
+                    Thread.currentThread().interrupt();
+                }
             }
         }
-
-
         return out;
     }
 
+    private void incrementReceivedMessage() {
+        if (isMetricEnabled && metricRegistry != null) {
+            ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage();
+        }
+    }
+
     /**
-     * Close consumer Dmaap client
+     * Close consumer Dmaap client.
      */
     @Override
     public void close() {
@@ -188,16 +201,11 @@ public class DmaapConsumerImpl implements Consumer {
         }
     }
 
-    @Override
-    public List<String> fetch() {
-        return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
-    }
-
     @Override
     public String toString() {
-        String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host"));
-        String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group"));
-        String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id"));
+        String hostStr = (props == null || props.getProperty("host") == null ? "N/A" : props.getProperty("host"));
+        String group = (props == null || props.getProperty("group") == null ? "N/A" : props.getProperty("group"));
+        String id = (props == null || props.getProperty("id") == null ? "N/A" : props.getProperty("id"));
         return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr);
     }
 
@@ -206,22 +214,17 @@ public class DmaapConsumerImpl implements Consumer {
         useHttps = yes;
     }
 
-
     private MetricService getMetricservice() {
         BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
-        // Get AAIadapter reference
         ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
         if (sref != null) {
             LOG.info("Metric Service from bundlecontext");
-            return (MetricServiceImpl) bctx.getService(sref);
-
+            return (MetricService) bctx.getService(sref);
         } else {
             LOG.info("Metric Service error from bundlecontext");
             LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService");
             return null;
-
         }
     }
 
-    
 }
index 5f91ca3..3fbfc95 100644 (file)
 
 package org.onap.appc.adapter.messaging.dmaap.impl;
 
-
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 import com.att.nsa.mr.client.MRBatchingPublisher;
 import com.att.nsa.mr.client.MRClientFactory;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.onap.appc.adapter.message.Producer;
-import org.onap.appc.adapter.messaging.dmaap.impl.DmaapUtil;
 import org.onap.appc.configuration.Configuration;
 import org.onap.appc.configuration.ConfigurationFactory;
 import org.onap.appc.metricservice.MetricRegistry;
 import org.onap.appc.metricservice.MetricService;
+import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
 import org.onap.appc.metricservice.metric.Metric;
 import org.onap.appc.metricservice.metric.MetricType;
-import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
 import org.onap.appc.metricservice.policy.PublishingPolicy;
 import org.onap.appc.metricservice.publisher.LogPublisher;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
 import org.osgi.framework.ServiceReference;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
 
 public class DmaapProducerImpl implements Producer {
 
     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducerImpl.class);
     private static final Configuration configuration = ConfigurationFactory.getConfiguration();
 
-    private Set<String> topics = new HashSet<String>();
+    private Set<String> topics;
 
     private Properties props = null;
-    private static MetricRegistry metricRegistry;
+    private MetricRegistry metricRegistry;
     private boolean useHttps = false;
-    private boolean isMetricEnabled=false;
+    private boolean isMetricEnabled = false;
     
     private Set<MRBatchingPublisher> clients;
 
@@ -72,16 +70,14 @@ public class DmaapProducerImpl implements Producer {
         this(urls, (Set<String>)null, user, password);
         this.topics = new HashSet<>();
         if (topicName != null) {
-            for (String topic : topicName.split(",")) {
-                topics.add(topic);
-            }
+            Collections.addAll(topics, topicName.split(","));
         }
     }
 
     public DmaapProducerImpl(Collection<String> urls, Set<String> topicNames, String user, String password) {
         topics = topicNames;
-        if(urls == null || user == null || password == null){
-            throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password" );
+        if (urls == null || user == null || password == null) {
+            throw new IllegalArgumentException("one of these mandaory argument is null: urls, user, password");
         }
         this.props = new Properties();
         String urlsStr = StringUtils.join(urls, ',');
@@ -90,62 +86,69 @@ public class DmaapProducerImpl implements Producer {
         props.setProperty("username",user);
         props.setProperty("password",password);
     }
+
     private void initMetric() {
         LOG.debug("Metric getting initialized");
         MetricService metricService = getMetricservice();
-        metricRegistry=metricService.createRegistry("APPC");
-       DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory().
-                dmaapRequestCounterBuilder().
-                withName("DMAAP_KPI").withType(MetricType.COUNTER).
-                withRecievedMessage(0)
+        if (metricService != null) {
+            metricRegistry = metricService.createRegistry("APPC");
+
+            DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
+                .dmaapRequestCounterBuilder()
+                .withName("DMAAP_KPI").withType(MetricType.COUNTER)
+                .withRecievedMessage(0)
                 .withPublishedMessage(0)
                 .build();
-        if(metricRegistry.register(dmaapKpiMetric)) {
-            Metric[] metrics = new Metric[]{dmaapKpiMetric};
-            LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
-            LogPublisher[] logPublishers = new LogPublisher[1];
-            logPublishers[0] = logPublisher;
-            PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory().
-                    scheduledPolicyBuilder().withPublishers(logPublishers).
-                    withMetrics(metrics).
-                    build();
-            LOG.debug("Policy getting initialized");
-            manuallyScheduledPublishingPolicy.init();
-            LOG.debug("Metric initialized");
-        }
 
+            if (metricRegistry.register(dmaapKpiMetric)) {
+                Metric[] metrics = new Metric[]{dmaapKpiMetric};
+                LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
+                LogPublisher[] logPublishers = new LogPublisher[1];
+                logPublishers[0] = logPublisher;
+
+                PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
+                    .scheduledPolicyBuilder()
+                    .withPublishers(logPublishers)
+                    .withMetrics(metrics)
+                    .build();
+
+                LOG.debug("Policy getting initialized");
+                manuallyScheduledPublishingPolicy.init();
+                LOG.debug("Metric initialized");
+            }
+        }
     }
+
     private Set<MRBatchingPublisher> getClients() {
         Set<MRBatchingPublisher> out = new HashSet<>();
         for (String topic : topics) {
             try {
                 String topicProducerPropFileName = DmaapUtil.createProducerPropFile(topic,props);
-                final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher (topicProducerPropFileName);
+                final MRBatchingPublisher client = MRClientFactory.createBatchingPublisher(topicProducerPropFileName);
                 out.add(client);
             } catch (Exception e) {
-                LOG.error(e.getMessage());
+                LOG.error(e.getMessage(), e);
             }
         }
-
         return out;
     }
 
     @Override
     public synchronized void updateCredentials(String key, String secret) {
         LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
-        props.setProperty("user",String.valueOf(key));
-        props.setProperty("password",String.valueOf(secret));
+        props.setProperty("user", String.valueOf(key));
+        props.setProperty("password", String.valueOf(secret));
         clients = null;
     }
 
     @Override
     public boolean post(String partition, String data) {
         boolean success = true;
-        Properties properties=configuration.getProperties();
-        if(properties!=null && properties.getProperty("metric.enabled")!=null ){
-            isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled"));
+        Properties properties = configuration.getProperties();
+        if (properties != null && properties.getProperty("metric.enabled") != null ) {
+            isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
         }
-        if(isMetricEnabled){
+        if (isMetricEnabled) {
             initMetric();
         }
         
@@ -161,31 +164,34 @@ public class DmaapProducerImpl implements Producer {
                 LOG.debug(String.format("Posting %s to %s", data, client));
                 client.send(partition, data);
             } catch (IOException e) {
-                LOG.error(e.getMessage());
+                LOG.error(e.getMessage(), e);
                 success = false;
             }
         }
-        if(isMetricEnabled){
-            ( (DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage();
-        }
+        incrementPublishedMessage();
         return success;
     }
 
+    private void incrementPublishedMessage() {
+        if (isMetricEnabled && metricRegistry != null) {
+            ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementPublishedMessage();
+        }
+    }
+
     /**
-     * Close producer Dmaap client
+     * Close producer Dmaap client.
      */
     @Override
     public void close() {
         if ((clients == null) || (clients.isEmpty())) {
             return;
         }
-
         LOG.debug("Closing Dmaap producer clients....");
         for (MRBatchingPublisher client : clients) {
             try {
                 client.close(1, TimeUnit.SECONDS);
             }  catch (IOException | InterruptedException e) {
-                LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client));
+                LOG.warn(String.format("Failed to cleanly close Dmaap connection for [%s]", client), e);
             }
         }
     }
@@ -201,16 +207,11 @@ public class DmaapProducerImpl implements Producer {
         if (sref != null) {
             LOG.info("Metric Service from bundlecontext");
             return (MetricService) bctx.getService(sref);
-
         } else {
             LOG.info("Metric Service error from bundlecontext");
             LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService");
             return null;
-
         }
     }
 
-    public  Metric getMetric(String name){
-        return metricRegistry.metric(name);
-    }
 }