X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fmessagerouter%2Fmsgrtr.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fdmf%2Fmr%2Fresources%2FCambriaOutboundEventStream.java;h=9ef2139758368b75230597c068faaca94cebbd59;hp=86a6ef8489ac5333cf6554b613772683aa32bb0d;hb=17a3af28994f91f6ebca3df89e43a29ccc4482ff;hpb=342a2bdb5ce6c52abb852ca6c4fc85c7a928d027 diff --git a/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java index 86a6ef8..9ef2139 100644 --- a/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java +++ b/src/main/java/com/att/dmf/mr/resources/CambriaOutboundEventStream.java @@ -171,7 +171,7 @@ public class CambriaOutboundEventStream implements StreamWriter { } // public Builder atOffset ( int pos ) - // { + // fOffset = pos; // return this; // } @@ -197,7 +197,7 @@ public class CambriaOutboundEventStream implements StreamWriter { fConsumer = builder.fConsumer; fLimit = builder.fLimit; fTimeoutMs = builder.fTimeoutMs; - // fSettings = builder.fSettings; + fSent = 0; fPretty = builder.fPretty; fWithMeta = builder.fWithMeta; @@ -244,7 +244,7 @@ public class CambriaOutboundEventStream implements StreamWriter { * @param msg * @throws IOException */ - // void onMessage(int count, Message msg) throws IOException; + void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException; } @@ -263,7 +263,7 @@ public class CambriaOutboundEventStream implements StreamWriter { * throws IOException */ public void write(final OutputStream os) throws IOException { - // final boolean transactionEnabled = topic.isTransactionEnabled(); + // final boolean transactionEnabled = isTransEnabled(); // final boolean transactionEnabled = istransEnable; // synchronized(this){ @@ -282,7 +282,7 @@ public class CambriaOutboundEventStream implements StreamWriter { entry.put("message", msg); os.write(entry.toString().getBytes()); } else { - // os.write(message.getBytes()); + String jsonString = JSONObject.valueToString(msg); os.write(jsonString.getBytes()); } @@ -299,7 +299,7 @@ public class CambriaOutboundEventStream implements StreamWriter { try { if (istransEnable && istransType) { // final String transactionId = - // jsonMessage.getString("transactionId"); + // responseTransactionId = transId; StringBuilder consumerInfo = new StringBuilder(); if (null != dmaapContext && null != dmaapContext.getRequest()) { @@ -338,7 +338,7 @@ public class CambriaOutboundEventStream implements StreamWriter { } }); - // if (null != dmaapContext && isTransactionEnabled()) { + if (null != dmaapContext && istransEnable && istransType) { dmaapContext.getResponse().setHeader("transactionId", @@ -353,10 +353,10 @@ public class CambriaOutboundEventStream implements StreamWriter { if (null != strclose_out_stream) close_out_stream = Boolean.parseBoolean(strclose_out_stream); - // if (fSettings.getBoolean("close.output.stream", true)) { + if (close_out_stream) { os.close(); - // } + } }