Fixes in "appc-dmaap-adapter-bundle"
[appc.git] / appc-adapters / appc-dmaap-adapter / appc-dmaap-adapter-bundle / src / main / java / org / onap / appc / adapter / messaging / dmaap / http / HttpDmaapConsumerImpl.java
index 822dfdd..5149e0c 100644 (file)
 
 package org.onap.appc.adapter.messaging.dmaap.http;
 
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
 import org.apache.http.HttpEntity;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -44,7 +43,7 @@ import org.onap.appc.adapter.message.Consumer;
 
 public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer {
 
-    private final EELFLogger logger = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
+    private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapConsumerImpl.class);
 
     // Default values
     private static final int DEFAULT_TIMEOUT_MS = 15000;
@@ -54,14 +53,15 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
     private List<String> urls;
     private String filter;
 
-
     public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
                                  String filter) {
+
         this(hosts, topicName, consumerName, consumerId, filter, null, null);
     }
 
     public HttpDmaapConsumerImpl(Collection<String> hosts, String topicName, String consumerName, String consumerId,
                                  String filter, String user, String password) {
+
         urls = new ArrayList<>();
         for (String host : hosts) {
             urls.add(String.format(URL_TEMPLATE, formatHostString(host), topicName, consumerName, consumerId));
@@ -72,13 +72,13 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
 
     @Override
     public void updateCredentials(String user, String pass) {
-        logger.debug(String.format("Setting auth to %s for %s", user, this.toString()));
+        LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
         this.setBasicAuth(user, pass);
     }
 
     @Override
     public List<String> fetch(int waitMs, int limit) {
-        logger.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
+        LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
         List<String> out = new ArrayList<>();
         try {
             List<NameValuePair> urlParams = new ArrayList<>();
@@ -92,39 +92,38 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
             builder.setParameters(urlParams);
 
             URI uri = builder.build();
-            logger.info(String.format("GET %s", uri));
+            LOG.info(String.format("GET %s", uri));
             HttpGet request = getReq(uri, waitMs);
             CloseableHttpResponse response = getClient().execute(request);
 
             int httpStatus = response.getStatusLine().getStatusCode();
             HttpEntity entity = response.getEntity();
-            String body = (entity != null) ? EntityUtils.toString(entity) : null;
+            String body = (entity != null) ? entityToString(entity) : null;
 
-            logger.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
+            LOG.debug(String.format("Request to %s completed with status %d and a body size of %s", uri, httpStatus,
                 body != null ? body.length() : "null"));
 
             response.close();
             if (httpStatus == 200 && body != null) {
                 JSONArray json = new JSONArray(body);
-                logger.info(String.format("Got %d messages from DMaaP", json.length()));
+                LOG.info(String.format("Got %d messages from DMaaP", json.length()));
                 for (int i = 0; i < json.length(); i++) {
                     out.add(json.getString(i));
                 }
             } else {
-                logger.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
+                LOG.error(String.format("Did not get 200 from DMaaP. Got %d - %s", httpStatus, body));
                 sleep(waitMs);
             }
         } catch (Exception e) {
             if (urls.size() > 1) {
                 String failedUrl = urls.remove(0);
                 urls.add(failedUrl);
-                logger.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
+                LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s", failedUrl,
                     urls.get(0)));
             }
-            logger.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
+            LOG.error(String.format("Got exception while querying DMaaP. Message: %s", e.getMessage()), e);
             sleep(waitMs);
         }
-
         return out;
     }
 
@@ -139,22 +138,18 @@ public class HttpDmaapConsumerImpl extends CommonHttpClient implements Consumer
         return String.format("Consumer listening to [%s]", hostStr);
     }
 
-    @Override
-    public void useHttps(boolean yes) {
+    String entityToString(HttpEntity entity) throws IOException {
+        return EntityUtils.toString(entity);
     }
 
     private void sleep(int ms) {
-        logger.info(String.format("Sleeping for %ds after failed request", ms / 1000));
+        LOG.info(String.format("Sleeping for %ds after failed request", ms / 1000));
         try {
             Thread.sleep(ms);
         } catch (InterruptedException e1) {
-            logger.error("Interrupted while sleeping", e1);
+            LOG.error("Interrupted while sleeping", e1);
             Thread.currentThread().interrupt();
         }
     }
 
-    @Override
-    public void close() {
-    }
-
 }