X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fmr%2Fclient%2Fimpl%2FMRSimplerBatchPublisher.java;h=bd140cd76e95fd4aba9b2d89f572774782af1619;hb=78ebc9a64fac6231e3e594200b9335a4c6372ed1;hp=9b969a6dba7c6f446f9665c066d2d2a7f3c268d9;hpb=8a00d4324ea7621df1a1febb8936df2807cbc443;p=dmaap%2Fmessagerouter%2Fdmaapclient.git diff --git a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java index 9b969a6..bd140cd 100644 --- a/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java +++ b/src/main/java/org/onap/dmaap/mr/client/impl/MRSimplerBatchPublisher.java @@ -59,9 +59,18 @@ import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher { private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class); + private static final String PROPS_PROTOCOL = "Protocol"; + private static final String PROPS_PARTITION = "partition"; + private static final String PROPS_CONTENT_TYPE = "contenttype"; + + private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip"; + private static final String CONTENT_TYPE_CAMBRIA = "application/cambria"; + private static final String CONTENT_TYPE_JSON = "application/json"; + private static final String CONTENT_TYPE_TEXT = "text/plain"; + + private static final String JSON_STATUS = "status"; + public static class Builder { - public Builder() { - } public Builder againstUrls(Collection baseUrls) { fUrls = baseUrls; @@ -278,6 +287,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP return jsonArray; } + private void logTime(long startMs, String dmeResponse) { + if (getLog().isInfoEnabled()) { + getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse); + } + } + private synchronized boolean sendBatch() { // it's possible for this call to be made with an empty list. in this // case, just return. @@ -291,28 +306,28 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP host = this.fHostSelector.selectBaseHost(); } - final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), - props.getProperty("partition")); + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL), + props.getProperty(PROPS_PARTITION)); try { final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); OutputStream os = baseStream; - final String contentType = props.getProperty("contenttype"); - if (contentType.equalsIgnoreCase("application/json")) { + final String contentType = props.getProperty(PROPS_CONTENT_TYPE); + if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) { JSONArray jsonArray = parseJSON(); os.write(jsonArray.toString().getBytes()); os.close(); - } else if (contentType.equalsIgnoreCase("text/plain")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { for (TimestampedMessage m : fPending) { os.write(m.fMsg.getBytes()); os.write('\n'); } os.close(); - } else if (contentType.equalsIgnoreCase("application/cambria") - || (contentType.equalsIgnoreCase("application/cambria-zip"))) { - if (contentType.equalsIgnoreCase("application/cambria-zip")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA) + || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) { + if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) { os = new GZIPOutputStream(baseStream); } for (TimestampedMessage m : fPending) { @@ -339,15 +354,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP DME2Configue(); - Thread.sleep(5); + this.wait(5); getLog().info(String .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath, nowMs - fPending.peek().timestamp)); sender.setPayload(os.toString()); String dmeResponse = sender.sendAndWait(5000L); - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse; - getLog().info(logLine); + logTime(startMs, dmeResponse); fPending.clear(); return true; } @@ -362,11 +376,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } @@ -380,11 +393,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } @@ -397,11 +409,10 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP // Here we are checking for error response. If HTTP status // code is not within the http success response code // then we consider this as error and return false - if (result.getInt("status") < 200 || result.getInt("status") > 299) { + if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) { return false; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result.toString()); fPending.clear(); return true; } @@ -424,25 +435,25 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP host = this.fHostSelector.selectBaseHost(); - final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), - props.getProperty("partition")); + final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL), + props.getProperty(PROPS_PARTITION)); OutputStream os = null; try { final ByteArrayOutputStream baseStream = new ByteArrayOutputStream(); os = baseStream; - final String contentType = props.getProperty("contenttype"); - if (contentType.equalsIgnoreCase("application/json")) { + final String contentType = props.getProperty(PROPS_CONTENT_TYPE); + if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) { JSONArray jsonArray = parseJSON(); os.write(jsonArray.toString().getBytes()); - } else if (contentType.equalsIgnoreCase("text/plain")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) { for (TimestampedMessage m : fPending) { os.write(m.fMsg.getBytes()); os.write('\n'); } - } else if (contentType.equalsIgnoreCase("application/cambria") - || (contentType.equalsIgnoreCase("application/cambria-zip"))) { - if (contentType.equalsIgnoreCase("application/cambria-zip")) { + } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA) + || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) { + if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) { os = new GZIPOutputStream(baseStream); } for (TimestampedMessage m : fPending) { @@ -469,7 +480,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP try { DME2Configue(); - Thread.sleep(5); + this.wait(5); getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath, nowMs - fPending.peek().timestamp); sender.setPayload(os.toString()); @@ -524,8 +535,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP return pubResponse; } - final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString(); - getLog().info(logLine); + logTime(startMs, result); fPending.clear(); return pubResponse; } @@ -618,12 +628,12 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP mrPubResponse.setResponseMessage("Please verify the Producer properties"); } else if (reply.startsWith("{")) { JSONObject jObject = new JSONObject(reply); - if (jObject.has("message") && jObject.has("status")) { + if (jObject.has("message") && jObject.has(JSON_STATUS)) { String message = jObject.getString("message"); if (null != message) { mrPubResponse.setResponseMessage(message); } - mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status"))); + mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS))); } else { mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK)); mrPubResponse.setResponseMessage(reply); @@ -730,11 +740,11 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP routeOffer = props.getProperty("routeOffer"); subContextPath = props.getProperty("SubContextPath") + fTopic; - protocol = props.getProperty("Protocol"); + protocol = props.getProperty(PROPS_PROTOCOL); methodType = props.getProperty("MethodType"); dmeuser = props.getProperty("username"); dmepassword = props.getProperty("password"); - contentType = props.getProperty("contenttype"); + contentType = props.getProperty(PROPS_CONTENT_TYPE); handlers = props.getProperty("sessionstickinessrequired"); routerFilePath = props.getProperty("DME2preferredRouterFilePath"); @@ -744,7 +754,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP * routeOffer value for auto failover within a cluster */ - String partitionKey = props.getProperty("partition"); + String partitionKey = props.getProperty(PROPS_PARTITION); if (partner != null && !partner.isEmpty()) { url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="