import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.support.ScheduledPollConsumer;
import org.onap.aai.logging.RouterCoreMsgs;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
@Override
protected int poll() throws Exception {
- logger.debug("Checking for event on topic: " + endpoint.getEventTopic());
+ String topic = endpoint.getEventTopic();
- int processCount = 0;
+ logger.debug("Checking for event on topic: " + topic);
- //Iterable<String> messages = consumer.fetch();
- Iterable<String> messages = consumer.consumeAndCommit();
+ int processCount = 0;
- String topic = endpoint.getEventTopic();
+ try {
+ Iterable<String> messages = consumer.consumeAndCommit();
- for (String message : messages) {
- Exchange exchange = endpoint.createExchange();
- exchange.getIn().setBody(message);
- getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
- ++processCount;
+ for (String message : messages) {
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setBody(message);
+ getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
+ ++processCount;
+ }
+ } catch (Exception e) {
+ logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage());
}
+
+ logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount));
+
return processCount;
}
@Override
protected void doStop() throws Exception {
super.doStop();
if (endpoint != null) {
- endpoint.close();
+ endpoint.end();
}
}
@Override
protected void doShutdown() throws Exception {
super.doShutdown();
if (endpoint != null) {
- endpoint.close();
+ endpoint.end();
}
}