}
// public Builder atOffset ( int pos )
- // {
+
// fOffset = pos;
// return this;
// }
fConsumer = builder.fConsumer;
fLimit = builder.fLimit;
fTimeoutMs = builder.fTimeoutMs;
- // fSettings = builder.fSettings;
+
fSent = 0;
fPretty = builder.fPretty;
fWithMeta = builder.fWithMeta;
* @param msg
* @throws IOException
*/
- // void onMessage(int count, Message msg) throws IOException;
+
void onMessage(int count, String msg, String transId, long offSet) throws IOException, JSONException;
}
* throws IOException
*/
public void write(final OutputStream os) throws IOException {
- // final boolean transactionEnabled = topic.isTransactionEnabled();
+
// final boolean transactionEnabled = isTransEnabled();
// final boolean transactionEnabled = istransEnable;
// synchronized(this){
entry.put("message", msg);
os.write(entry.toString().getBytes());
} else {
- // os.write(message.getBytes());
+
String jsonString = JSONObject.valueToString(msg);
os.write(jsonString.getBytes());
}
try {
if (istransEnable && istransType) {
// final String transactionId =
- // jsonMessage.getString("transactionId");
+
// responseTransactionId = transId;
StringBuilder consumerInfo = new StringBuilder();
if (null != dmaapContext && null != dmaapContext.getRequest()) {
}
});
- // if (null != dmaapContext && isTransactionEnabled()) {
+
if (null != dmaapContext && istransEnable && istransType) {
dmaapContext.getResponse().setHeader("transactionId",
if (null != strclose_out_stream)
close_out_stream = Boolean.parseBoolean(strclose_out_stream);
- // if (fSettings.getBoolean("close.output.stream", true)) {
+
if (close_out_stream) {
os.close();
- // }
+
}
}
*
* Checks whether filter is initialized
*/
- /*private boolean isFilterInitialized() {
- return (fHpAlarmFilter != null && fHppe != null);
- }
-*/
+
+
+
+
/**
*
* @param msg
*/
private boolean filterMatches(String msg) {
boolean result = true;
- /*if (isFilterInitialized()) {
- try {
- final HpJsonEvent e = new HpJsonEvent("e", new JSONObject(msg));
- result = fHpAlarmFilter.matches(fHppe, e);
- } catch (JSONException x) {
- // the msg may not be JSON
- result = false;
- log.error("Failed due to " + x.getMessage());
- } catch (Exception x) {
- log.error("Error using filter: " + x.getMessage(), x);
- }
- }*/
+
+
+
+
+
+
+
+
+
+
+
+
return result;
}
private final Consumer fConsumer;
private final int fLimit;
private final int fTimeoutMs;
- // private final rrNvReadable fSettings;
+
private final boolean fPretty;
private final boolean fWithMeta;
private int fSent;
-// private final HpAlarmFilter<HpJsonEvent> fHpAlarmFilter;
+
//private final HpProcessingEngine<HpJsonEvent> fHppe;
private DMaaPContext dmaapContext;
private String responseTransactionId;
private ArrayList<Consumer> fKafkaConsumerList;
private boolean istransType = true;
// private static final Logger log =
- // Logger.getLogger(CambriaOutboundEventStream.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
}
\ No newline at end of file
String topicNameStd = null;
// topicNameStd=
- // ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF");
+
topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
"enforced.topic.name.AAF");
String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
&& !topic.equalsIgnoreCase(metricTopicname))) {
// the topic name will be sent by the client
- // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub";
+
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
String permission = aaf.aafPermissionString(topic, "pub");
if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
if (null != batchlen)
maxEventBatch = Long.parseLong(batchlen);
// long maxEventBatch =
- // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
+
final LinkedList<Publisher.message> batch = new LinkedList<>();
// final ArrayList<KeyedMessage<String, String>> kms = new
// ArrayList<KeyedMessage<String, String>>();
throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
CambriaApiException, IOException {
- // final long startTime = System.currentTimeMillis();
+
final HttpServletRequest req = ctx.getRequest();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
limit = Integer.parseInt(req.getParameter("limit"));
}
limit = 1;
- // int timeoutMs = 60000;
+
int timeoutMs = CambriaConstants.kNoTimeout;
String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
if (strtimeoutMS != null)
timeoutMs = Integer.parseInt(strtimeoutMS);
// int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
- // CambriaConstants.kNoTimeout);
+
if (req.getParameter("timeout") != null) {
timeoutMs = Integer.parseInt(req.getParameter("timeout"));
}