First sonar issues review
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRSimplerBatchPublisher.java
index fb57b94..bd140cd 100644 (file)
@@ -59,9 +59,18 @@ import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
        private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
 
+       private static final String PROPS_PROTOCOL = "Protocol";
+       private static final String PROPS_PARTITION = "partition";
+       private static final String PROPS_CONTENT_TYPE = "contenttype";
+
+       private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip";
+       private static final String CONTENT_TYPE_CAMBRIA = "application/cambria";
+       private static final String CONTENT_TYPE_JSON = "application/json";
+       private static final String CONTENT_TYPE_TEXT = "text/plain";
+
+       private static final String JSON_STATUS = "status";
+
        public static class Builder {
-               public Builder() {
-               }
 
                public Builder againstUrls(Collection<String> baseUrls) {
                        fUrls = baseUrls;
@@ -131,6 +140,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                private String fTransportype;   
                private String fTopic;
                private int fMaxBatchSize = 100;
+               
                private long fMaxBatchAgeMs = 1000;
                private boolean fCompress = false;
                private int threadOccuranceTime = 50;
@@ -178,8 +188,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                try {
                        final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                        if (remains.isEmpty()) {
-                               getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
-                                               + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
+                               getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
+                                       remains.size());
                        }
                } catch (InterruptedException e) {
                        getLog().warn("Possible message loss. " + e.getMessage(), e);
@@ -233,7 +243,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
        private synchronized boolean shouldSendNow() {
                boolean shouldSend = false;
-               if (fPending.isEmpty()) {
+               if (fPending.size()>0) {
                        final long nowMs = Clock.now();
 
                        shouldSend = (fPending.size() >= fMaxBatchSize);
@@ -277,6 +287,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                return jsonArray;
        }
 
+       private void logTime(long startMs, String dmeResponse) {
+               if (getLog().isInfoEnabled()) {
+                       getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse);
+               }
+       }
+
        private synchronized boolean sendBatch() {
                // it's possible for this call to be made with an empty list. in this
                // case, just return.
@@ -290,28 +306,28 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        host = this.fHostSelector.selectBaseHost();
                }
 
-               final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
-                               props.getProperty("partition"));
+               final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
+                               props.getProperty(PROPS_PARTITION));
 
                try {
                        
                        final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
                        OutputStream os = baseStream;
-                       final String contentType = props.getProperty("contenttype");
-                       if (contentType.equalsIgnoreCase("application/json")) {
+                       final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
+                       if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
                                JSONArray jsonArray = parseJSON();
                                os.write(jsonArray.toString().getBytes());
                                os.close();
 
-                       } else if (contentType.equalsIgnoreCase("text/plain")) {
+                       } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
                                for (TimestampedMessage m : fPending) {
                                        os.write(m.fMsg.getBytes());
                                        os.write('\n');
                                }
                                os.close();
-                       } else if (contentType.equalsIgnoreCase("application/cambria")
-                                       || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
-                               if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+                       } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
+                                       || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
+                               if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
                                        os = new GZIPOutputStream(baseStream);
                                }
                                for (TimestampedMessage m : fPending) {
@@ -338,21 +354,21 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
                                DME2Configue();
 
-                               Thread.sleep(5);
-                               getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               this.wait(5);
+                               getLog().info(String
+                                       .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
+                                               nowMs - fPending.peek().timestamp));
                                sender.setPayload(os.toString());
                                String dmeResponse = sender.sendAndWait(5000L);
 
-                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse;
-                               getLog().info(logLine);
+                               logTime(startMs, dmeResponse);
                                fPending.clear();
                                return true;
                        }
 
             if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                        + (nowMs - fPending.peek().timestamp) + " ms");
+                getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                 final JSONObject result =
                         postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
                                 .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
@@ -360,46 +376,43 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                 // Here we are checking for error response. If HTTP status
                 // code is not within the http success response code
                 // then we consider this as error and return false
-                if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+                if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
                     return false;
                 }
-                final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
-                getLog().info(logLine);
+                               logTime(startMs, result.toString());
                 fPending.clear();
                 return true;
             }
 
                        if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                                final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
                                                protocolFlag);
 
                                // Here we are checking for error response. If HTTP status
                                // code is not within the http success response code
                                // then we consider this as error and return false
-                               if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+                               if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
                                        return false;
                                }
-                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
-                               getLog().info(logLine);
+                               logTime(startMs, result.toString());
                                fPending.clear();
                                return true;
                        }
                        
                        if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                                final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
 
                                // Here we are checking for error response. If HTTP status
                                // code is not within the http success response code
                                // then we consider this as error and return false
-                               if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+                               if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
                                        return false;
                                }
-                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
-                               getLog().info(logLine);
+                               logTime(startMs, result.toString());
                                fPending.clear();
                                return true;
                        }
@@ -422,25 +435,25 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
                host = this.fHostSelector.selectBaseHost();
 
-               final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
-                               props.getProperty("partition"));
+               final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
+                               props.getProperty(PROPS_PARTITION));
                OutputStream os = null;
                try {
 
                        final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
                        os = baseStream;
-                       final String contentType = props.getProperty("contenttype");
-                       if (contentType.equalsIgnoreCase("application/json")) {
+                       final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
+                       if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
                                JSONArray jsonArray = parseJSON();
                                os.write(jsonArray.toString().getBytes());
-                       } else if (contentType.equalsIgnoreCase("text/plain")) {
+                       } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
                                for (TimestampedMessage m : fPending) {
                                        os.write(m.fMsg.getBytes());
                                        os.write('\n');
                                }
-                       } else if (contentType.equalsIgnoreCase("application/cambria")
-                                       || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
-                               if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+                       } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
+                                       || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
+                               if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
                                        os = new GZIPOutputStream(baseStream);
                                }
                                for (TimestampedMessage m : fPending) {
@@ -467,9 +480,9 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                try {
                                        DME2Configue();
 
-                                       Thread.sleep(5);
-                                       getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
-                                                       + (nowMs - fPending.peek().timestamp) + " ms");
+                                       this.wait(5);
+                                       getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
+                                                       nowMs - fPending.peek().timestamp);
                                        sender.setPayload(os.toString());
 
                                        String dmeResponse = sender.sendAndWait(5000L);
@@ -506,8 +519,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        }
 
                        if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                                final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
                                                authDate, username, password, protocolFlag);
                                // Here we are checking for error response. If HTTP status
@@ -522,15 +535,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                        return pubResponse;
                                }
 
-                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
-                               getLog().info(logLine);
+                               logTime(startMs, result);
                                fPending.clear();
                                return pubResponse;
                        }
 
                        if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                                final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
                                                password, protocolFlag);
 
@@ -552,8 +564,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        }
                        
                        if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                                final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
 
                                // Here we are checking for error response. If HTTP status
@@ -572,7 +584,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                fPending.clear();
                                return pubResponse;
                        }
-               } catch (IllegalArgumentException x) {
+               } catch (IllegalArgumentException | HttpException x) {
                        getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
                        pubResponse.setResponseMessage(x.getMessage());
@@ -581,12 +593,6 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                        pubResponse.setResponseMessage(x.getMessage());
-
-               } catch (HttpException x) {
-                       getLog().warn(x.getMessage(), x);
-                       pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
-                       pubResponse.setResponseMessage(x.getMessage());
-
                } catch (Exception x) {
                        getLog().warn(x.getMessage(), x);
 
@@ -622,12 +628,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        mrPubResponse.setResponseMessage("Please verify the Producer properties");
                } else if (reply.startsWith("{")) {
                        JSONObject jObject = new JSONObject(reply);
-                       if (jObject.has("message") && jObject.has("status")) {
+                       if (jObject.has("message") && jObject.has(JSON_STATUS)) {
                                String message = jObject.getString("message");
                                if (null != message) {
                                        mrPubResponse.setResponseMessage(message);
                                }
-                               mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+                               mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
                        } else {
                                mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
                                mrPubResponse.setResponseMessage(reply);
@@ -711,6 +717,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
        public void setProps(Properties props) {
                this.props = props;
+               setClientConfig(DmaapClientUtil.getClientConfig(props));
        }
 
        public String getProtocolFlag() {
@@ -733,11 +740,11 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        routeOffer = props.getProperty("routeOffer");
                        subContextPath = props.getProperty("SubContextPath") + fTopic;
                        
-                       protocol = props.getProperty("Protocol");
+                       protocol = props.getProperty(PROPS_PROTOCOL);
                        methodType = props.getProperty("MethodType");
                        dmeuser = props.getProperty("username");
                        dmepassword = props.getProperty("password");
-                       contentType = props.getProperty("contenttype");
+                       contentType = props.getProperty(PROPS_CONTENT_TYPE);
                        handlers = props.getProperty("sessionstickinessrequired");
                        routerFilePath = props.getProperty("DME2preferredRouterFilePath");
 
@@ -747,7 +754,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                         * routeOffer value for auto failover within a cluster
                         */
 
-                       String partitionKey = props.getProperty("partition");
+                       String partitionKey = props.getProperty(PROPS_PARTITION);
 
                        if (partner != null && !partner.isEmpty()) {
                                url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="