} catch (InterruptedException x) {
log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
x);
+ Thread.currentThread().interrupt();
}
Callable<Boolean> run = new Callable<Boolean>() {
}
} catch (KafkaException x) {
- log.debug(fLogTag + ": KafkaException " + x.getMessage());
+ log.debug(fLogTag + ": KafkaException ", x);
} catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
- log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
- + x.getMessage());
+ log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
}
future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
// second
} catch (TimeoutException ex) {
+ log.error("TimeoutException in in Kafka consumer ", ex);
// timed out. Try to stop the code if possible.
String apiNodeId = null;
try {
apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
} catch (UnknownHostException e1) {
- // TODO Auto-generated catch block
- log.error("unable to get the localhost address");
+ log.error("unable to get the localhost address ", e1);
}
try {
if (fKafkaLiveLockAvoider != null)
fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
} catch (Exception e) {
- log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
+ log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
}
forcePollOnConsumer();
future.cancel(true);
} catch (Exception ex) {
+ log.error("Exception in in Kafka consumer ", ex);
// timed out. Try to stop the code if possible.
future.cancel(true);
}
// second
} catch (TimeoutException ex) {
// timed out. Try to stop the code if possible.
- log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
+ log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
future.cancel(true);
setState(Kafka011Consumer.State.OPENED);
} catch (Exception ex) {
// timed out. Try to stop the code if possible.
- log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
- + ex);
+ log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
future.cancel(true);
setState(Kafka011Consumer.State.OPENED);
return false;
}
} catch (Exception e) {
- log.error("Failed and go to Exception block for " + fGroup + " " + e.getMessage());
+ log.error("Failed and go to Exception block for " + fGroup +" ", e);
}
}
});
}
} catch (java.util.ConcurrentModificationException e) {
- log.error("Error occurs for " + e);
+ log.error("Error occurs for ", e);
} catch (Exception e) {
- log.error("Failed and go to Exception block for " + group + " " + e.getMessage());
+ log.error("Failed and go to Exception block for " + group + " ", e);
}
}
});
curatorConsumerCache.close();
log.info("Curator client closed");
} catch (ZkInterruptedException e) {
- log.warn("Curator client close interrupted: " + e.getMessage());
+ log.warn("Curator client close interrupted: ", e);
} catch (IOException e) {
- log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage());
+ log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
}
curatorConsumerCache = null;
log.info(" ^ deleted " + fBaseZkPath + "/" + key);
} catch (NoNodeException e) {
log.warn("A consumer was deleted from " + fApiId
- + "'s cache, but no Cambria API node had ownership of it in ZooKeeper");
+ + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
} catch (Exception e) {
- log.debug("Unexpected exception while deleting consumer: " + e.getMessage());
- log.info(" %%%%%%@# Unexpected exception while deleting consumer: " + e.getMessage());
+ log.debug("Unexpected exception while deleting consumer: ", e);
+ log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
}
try {
try {
curator.setData().forPath(consumerPath, fApiId.getBytes());
} catch (KeeperException.NoNodeException e) {
+ log.info("KeeperException.NoNodeException occured", e);
curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
}
log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-/*import com.att.sa.highlandPark.config.HpConfigContext;
-import com.att.sa.highlandPark.config.HpReaderException;
-import com.att.sa.highlandPark.events.HpJsonEvent;
-import com.att.sa.highlandPark.events.HpJsonEventFactory;
-import com.att.sa.highlandPark.processor.HpAlarmFilter;
-import com.att.sa.highlandPark.processor.HpEvent;
-import com.att.sa.highlandPark.processor.HpProcessingEngine;
-import com.att.sa.highlandPark.processor.HpProcessingEngine.EventFactory;
-*/
+
+
+
+
+
+
+
+
+
/**
* class used to write the consumed messages
*
// public Builder atOffset ( int pos )
- // return this;
+
// }
/**
* method returs object of CambriaOutboundEventStream
fPretty = builder.fPretty;
fWithMeta = builder.fWithMeta;
fKafkaConsumerList = builder.fKafkaConsumerList;
- /* if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) {
- fHpAlarmFilter = null;
- fHppe = null;
- } else {
- try {
- final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter));
- HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>();
- fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter);
- final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory();
- fHppe = new HpProcessingEngine<HpJsonEvent>(ef);
- } catch (HpReaderException e) {
- // JSON was okay, but the filter engine says it's bogus
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
- "Couldn't create filter: " + e.getMessage());
- } catch (JSONException e) {
- // user sent a bogus JSON object
- throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
- "Couldn't parse JSON: " + e.getMessage());
- }
- }*/
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
}
/**
public void write(final OutputStream os) throws IOException {
- // final boolean transactionEnabled = istransEnable;
+
// synchronized(this){
os.write('[');
fSent = forEachMessage(new operation() {
private final boolean fWithMeta;
private int fSent;
- //private final HpProcessingEngine<HpJsonEvent> fHppe;
+
private DMaaPContext dmaapContext;
private String responseTransactionId;
private Topic topic;
log.info("Broker Type is:" + CambriaConstants.kBrokerType_Memory);
fPublisher = new MemoryQueuePublisher(q, mmb);
//Ramkumar remove below
- // fMetaBroker = mmb;
+
fConsumerFactory = new MemoryConsumerFactory(q);
} else {
throw new IllegalArgumentException(
*
* @return
*/
- /*public rrNvReadable getSettings() {
- return settings;
- }*/
+
+
+
/**
* method to set rrNvReadable object
*
* @param settings
*/
- /*public void setSettings(rrNvReadable settings) {
- this.settings = settings;
- }*/
+
+
+
/**
* method to get CuratorFramework object
* @return
*/
public static String getMainZookeeperConnectionString() {
- //return settings.getString(CambriaConstants.kSetting_ZkConfigDbServers, CambriaConstants.kDefault_ZkConfigDbServers);
+
String zkServername = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
if (zkServername==null) zkServername=CambriaConstants.kDefault_ZkConfigDbServers;