X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fmr%2Fclient%2Fimpl%2FMRSimplerBatchPublisher.java;h=bd140cd76e95fd4aba9b2d89f572774782af1619;hb=78ebc9a64fac6231e3e594200b9335a4c6372ed1;hp=fb57b940601ad5b12905e0e044fe94d79bccc1e9;hpb=437cec5434271167a15c4fcdd19ab60d84774c13;p=dmaap%2Fmessagerouter%2Fdmaapclient.git diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java index fb57b94..bd140cd 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java @@ -59,9 +59,18 @@ import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher { private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class); + private static final String PROPS_PROTOCOL = "Protocol"; + private static final String PROPS_PARTITION = "partition"; + private static final String PROPS_CONTENT_TYPE = "contenttype"; + + private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip"; + private static final String CONTENT_TYPE_CAMBRIA = "application/cambria"; + private static final String CONTENT_TYPE_JSON = "application/json"; + private static final String CONTENT_TYPE_TEXT = "text/plain"; + + private static final String JSON_STATUS = "status"; + public static class Builder { - public Builder() { - } public Builder againstUrls(Collection baseUrls) { fUrls = baseUrls; @@ -131,6 +140,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP private String fTransportype; private String fTopic; private int fMaxBatchSize = 100; + private long fMaxBatchAgeMs = 1000; private boolean fCompress = false; private int threadOccuranceTime = 50; @@ -178,8 +188,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP try { final List remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); if (remains.isEmpty()) { - getLog().warn("Closing publisher with " + remains.size() + " messages unsent. " - + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close."); + getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.", + remains.size()); } } catch (InterruptedException e) { getLog().warn("Possible message loss. " + e.getMessage(), e); @@ -233,7 +243,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP private synchronized boolean shouldSendNow() { boolean shouldSend = false; - if (fPending.isEmpty()) { + if (fPending.size()>0) { final long nowMs = Clock.now(); shouldSend = (fPending.size() >= fMaxBatchSize); @@ -277,6 +287,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP return jsonArray; } + private void logTime(long startMs, String dmeResponse) { + if (getLog().isInfoEnabled()) { + getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse); + } + } + private synchronized boolean sendBatch() { // it's possible for this call to be made with an empty list. in this // case, just return. @@ -290,28 +306,28 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP host = this.fHostSelector.selectBaseHost(); } - final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), - props.getProperty("partition")); + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL), + props.getProperty(PROPS_PARTITION)); try { final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); OutputStream os = baseStream; - final String contentType = props.getProperty("contenttype"); - if (contentType.equalsIgnoreCase("application/json")) { + final String contentType = props.getProperty(PROPS_CONTENT_TYPE); + if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) { JSONArray jsonArray = parseJSON(); os.write(jsonArray.toString().getBytes()); os.close(); - } else if (contentType.equalsIgnoreCase("text/plain")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { for (TimestampedMessage m : fPending) { os.write(m.fMsg.getBytes()); os.write('\n'); } os.close(); - } else if (contentType.equalsIgnoreCase("application/cambria") - || (contentType.equalsIgnoreCase("application/cambria-zip"))) { - if (contentType.equalsIgnoreCase("application/cambria-zip")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA) + || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) { + if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) { os = new GZIPOutputStream(baseStream); } for (TimestampedMessage m : fPending) { @@ -338,21 +354,21 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP DME2Configue(); - Thread.sleep(5); - getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: " - + (nowMs - fPending.peek().timestamp) + " ms"); + this.wait(5); + getLog().info(String + .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath, + nowMs - fPending.peek().timestamp)); sender.setPayload(os.toString()); String dmeResponse = sender.sendAndWait(5000L); - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse; - getLog().info(logLine); + logTime(startMs, dmeResponse); fPending.clear(); return true; } if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " - + (nowMs - fPending.peek().timestamp) + " ms"); + getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, + nowMs - fPending.peek().timestamp); final JSONObject result = postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray()) .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate) @@ -360,46 +376,43 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " - + (nowMs - fPending.peek().timestamp) + " ms"); + getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, + nowMs - fPending.peek().timestamp); final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag); // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " - + (nowMs - fPending.peek().timestamp) + " ms"); + getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, + nowMs - fPending.peek().timestamp); final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType); // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } @@ -422,25 +435,25 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP host = this.fHostSelector.selectBaseHost(); - final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), - props.getProperty("partition")); + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL), + props.getProperty(PROPS_PARTITION)); OutputStream os = null; try { final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); os = baseStream; - final String contentType = props.getProperty("contenttype"); - if (contentType.equalsIgnoreCase("application/json")) { + final String contentType = props.getProperty(PROPS_CONTENT_TYPE); + if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) { JSONArray jsonArray = parseJSON(); os.write(jsonArray.toString().getBytes()); - } else if (contentType.equalsIgnoreCase("text/plain")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { for (TimestampedMessage m : fPending) { os.write(m.fMsg.getBytes()); os.write('\n'); } - } else if (contentType.equalsIgnoreCase("application/cambria") - || (contentType.equalsIgnoreCase("application/cambria-zip"))) { - if (contentType.equalsIgnoreCase("application/cambria-zip")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA) + || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) { + if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) { os = new GZIPOutputStream(baseStream); } for (TimestampedMessage m : fPending) { @@ -467,9 +480,9 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP try { DME2Configue(); - Thread.sleep(5); - getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: " - + (nowMs - fPending.peek().timestamp) + " ms"); + this.wait(5); + getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath, + nowMs - fPending.peek().timestamp); sender.setPayload(os.toString()); String dmeResponse = sender.sendAndWait(5000L); @@ -506,8 +519,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP } if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " - + (nowMs - fPending.peek().timestamp) + " ms"); + getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, + nowMs - fPending.peek().timestamp); final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password, protocolFlag); // Here we are checking for error response. If HTTP status @@ -522,15 +535,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP return pubResponse; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result); fPending.clear(); return pubResponse; } if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " - + (nowMs - fPending.peek().timestamp) + " ms"); + getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, + nowMs - fPending.peek().timestamp); final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag); @@ -552,8 +564,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP } if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) { - getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " - + (nowMs - fPending.peek().timestamp) + " ms"); + getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl, + nowMs - fPending.peek().timestamp); final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType); // Here we are checking for error response. If HTTP status @@ -572,7 +584,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP fPending.clear(); return pubResponse; } - } catch (IllegalArgumentException x) { + } catch (IllegalArgumentException | HttpException x) { getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); pubResponse.setResponseMessage(x.getMessage()); @@ -581,12 +593,6 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP getLog().warn(x.getMessage(), x); pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); pubResponse.setResponseMessage(x.getMessage()); - - } catch (HttpException x) { - getLog().warn(x.getMessage(), x); - pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST)); - pubResponse.setResponseMessage(x.getMessage()); - } catch (Exception x) { getLog().warn(x.getMessage(), x); @@ -622,12 +628,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP mrPubResponse.setResponseMessage("Please verify the Producer properties"); } else if (reply.startsWith("{")) { JSONObject jObject = new JSONObject(reply); - if (jObject.has("message") && jObject.has("status")) { + if (jObject.has("message") && jObject.has(JSON_STATUS)) { String message = jObject.getString("message"); if (null != message) { mrPubResponse.setResponseMessage(message); } - mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status"))); + mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS))); } else { mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); mrPubResponse.setResponseMessage(reply); @@ -711,6 +717,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP public void setProps(Properties props) { this.props = props; + setClientConfig(DmaapClientUtil.getClientConfig(props)); } public String getProtocolFlag() { @@ -733,11 +740,11 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP routeOffer = props.getProperty("routeOffer"); subContextPath = props.getProperty("SubContextPath") + fTopic; - protocol = props.getProperty("Protocol"); + protocol = props.getProperty(PROPS_PROTOCOL); methodType = props.getProperty("MethodType"); dmeuser = props.getProperty("username"); dmepassword = props.getProperty("password"); - contentType = props.getProperty("contenttype"); + contentType = props.getProperty(PROPS_CONTENT_TYPE); handlers = props.getProperty("sessionstickinessrequired"); routerFilePath = props.getProperty("DME2preferredRouterFilePath"); @@ -747,7 +754,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP * routeOffer value for auto failover within a cluster */ - String partitionKey = props.getProperty("partition"); + String partitionKey = props.getProperty(PROPS_PARTITION); if (partner != null && !partner.isEmpty()) { url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="