fix the code that determines when to publish batch
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRSimplerBatchPublisher.java
index c5eb3ba..8f2a66e 100644 (file)
@@ -131,6 +131,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                private String fTransportype;   
                private String fTopic;
                private int fMaxBatchSize = 100;
+               
                private long fMaxBatchAgeMs = 1000;
                private boolean fCompress = false;
                private int threadOccuranceTime = 50;
@@ -151,7 +152,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
        @Override
        public int send(message msg) {
-               final LinkedList<message> list = new LinkedList<message>();
+               final LinkedList<message> list = new LinkedList<>();
                list.add(msg);
                return send(list);
        }
@@ -178,8 +179,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                try {
                        final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                        if (remains.isEmpty()) {
-                               getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
-                                               + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
+                               getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
+                                       remains.size());
                        }
                } catch (InterruptedException e) {
                        getLog().warn("Possible message loss. " + e.getMessage(), e);
@@ -210,7 +211,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                }
 
                synchronized (this) {
-                       final LinkedList<message> result = new LinkedList<message>();
+                       final LinkedList<message> result = new LinkedList<>();
                        fPending.drainTo(result);
                        return result;
                }
@@ -223,19 +224,17 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
         * @param force
         */
        private synchronized void send(boolean force) {
-               if (force || shouldSendNow()) {
-                       if (!sendBatch()) {
+               if ((force || shouldSendNow()) && !sendBatch()) {
                                getLog().warn("Send failed, " + fPending.size() + " message to send.");
 
                                // note the time for back-off
                                fDontSendUntilMs = sfWaitAfterError + Clock.now();
-                       }
                }
        }
 
        private synchronized boolean shouldSendNow() {
                boolean shouldSend = false;
-               if (fPending.isEmpty()) {
+               if (fPending.size()>0) {
                        final long nowMs = Clock.now();
 
                        shouldSend = (fPending.size() >= fMaxBatchSize);
@@ -282,7 +281,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
        private synchronized boolean sendBatch() {
                // it's possible for this call to be made with an empty list. in this
                // case, just return.
-               if (fPending.size() < 1) {
+               if (fPending.isEmpty()) {
                        return true;
                }
 
@@ -341,37 +340,40 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                DME2Configue();
 
                                Thread.sleep(5);
-                               getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info(String
+                                       .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
+                                               nowMs - fPending.peek().timestamp));
                                sender.setPayload(os.toString());
                                String dmeResponse = sender.sendAndWait(5000L);
 
-                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
+                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse;
                                getLog().info(logLine);
                                fPending.clear();
                                return true;
                        }
 
-                       if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
-                               final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
-                                               username, password, protocolFlag);
-                               // Here we are checking for error response. If HTTP status
-                               // code is not within the http success response code
-                               // then we consider this as error and return false
-                               if (result.getInt("status") < 200 || result.getInt("status") > 299) {
-                                       return false;
-                               }
-                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
-                               getLog().info(logLine);
-                               fPending.clear();
-                               return true;
-                       }
+            if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+                getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
+                final JSONObject result =
+                        postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
+                                .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
+                                .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
+                // Here we are checking for error response. If HTTP status
+                // code is not within the http success response code
+                // then we consider this as error and return false
+                if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+                    return false;
+                }
+                final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+                getLog().info(logLine);
+                fPending.clear();
+                return true;
+            }
 
                        if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                                final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
                                                protocolFlag);
 
@@ -388,8 +390,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        }
                        
                        if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                                final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
 
                                // Here we are checking for error response. If HTTP status
@@ -403,12 +405,6 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                fPending.clear();
                                return true;
                        }
-               } catch (IllegalArgumentException x) {
-                       getLog().warn(x.getMessage(), x);
-               } catch (IOException x) {
-                       getLog().warn(x.getMessage(), x);
-               } catch (HttpException x) {
-                       getLog().warn(x.getMessage(), x);
                } catch (Exception x) {
                        getLog().warn(x.getMessage(), x);
                }
@@ -474,8 +470,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                        DME2Configue();
 
                                        Thread.sleep(5);
-                                       getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
-                                                       + (nowMs - fPending.peek().timestamp) + " ms");
+                                       getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
+                                                       nowMs - fPending.peek().timestamp);
                                        sender.setPayload(os.toString());
 
                                        String dmeResponse = sender.sendAndWait(5000L);
@@ -512,8 +508,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        }
 
                        if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                                final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
                                                authDate, username, password, protocolFlag);
                                // Here we are checking for error response. If HTTP status
@@ -535,8 +531,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        }
 
                        if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                                final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
                                                password, protocolFlag);
 
@@ -558,8 +554,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        }
                        
                        if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
-                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+                                       nowMs - fPending.peek().timestamp);
                                final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
 
                                // Here we are checking for error response. If HTTP status
@@ -578,7 +574,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                fPending.clear();
                                return pubResponse;
                        }
-               } catch (IllegalArgumentException x) {
+               } catch (IllegalArgumentException | HttpException x) {
                        getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
                        pubResponse.setResponseMessage(x.getMessage());
@@ -587,12 +583,6 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                        pubResponse.setResponseMessage(x.getMessage());
-
-               } catch (HttpException x) {
-                       getLog().warn(x.getMessage(), x);
-                       pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
-                       pubResponse.setResponseMessage(x.getMessage());
-
                } catch (Exception x) {
                        getLog().warn(x.getMessage(), x);
 
@@ -602,7 +592,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                }
 
                finally {
-                       if (fPending.size() > 0) {
+                       if (!fPending.isEmpty()) {
                                getLog().warn("Send failed, " + fPending.size() + " message to send.");
                                pubResponse.setPendingMsgs(fPending.size());
                        }
@@ -833,7 +823,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                fMaxBatchAgeMs = maxBatchAgeMs;
                fCompress = compress;
 
-               fPending = new LinkedBlockingQueue<TimestampedMessage>();
+               fPending = new LinkedBlockingQueue<>();
                fDontSendUntilMs = 0;
                fExec = new ScheduledThreadPoolExecutor(1);
                pubResponse = new MRPublisherResponse();
@@ -855,7 +845,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                fMaxBatchAgeMs = maxBatchAgeMs;
                fCompress = compress;
                threadOccuranceTime = httpThreadOccurnace;
-               fPending = new LinkedBlockingQueue<TimestampedMessage>();
+               fPending = new LinkedBlockingQueue<>();
                fDontSendUntilMs = 0;
                fExec = new ScheduledThreadPoolExecutor(1);
                fExec.scheduleAtFixedRate(new Runnable() {