Update apache camel from 2.x to 3.x
[aai/router-core.git] / src / main / java / org / onap / aai / event / EventBusConsumer.java
index e795e2c..bc698df 100644 (file)
@@ -25,7 +25,7 @@ import org.onap.aai.event.api.EventConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.support.ScheduledPollConsumer;
 import org.onap.aai.logging.RouterCoreMsgs;
 import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
@@ -69,35 +69,41 @@ public class EventBusConsumer extends ScheduledPollConsumer {
   @Override
   protected int poll() throws Exception {
 
-    logger.debug("Checking for event on topic: " + endpoint.getEventTopic());
+    String topic = endpoint.getEventTopic();
 
-    int processCount = 0;
+    logger.debug("Checking for event on topic: " + topic);
 
-    //Iterable<String> messages = consumer.fetch();
-    Iterable<String> messages = consumer.consumeAndCommit();
+    int processCount = 0;
 
-    String topic = endpoint.getEventTopic();
+    try {
+      Iterable<String> messages = consumer.consumeAndCommit();
 
-    for (String message : messages) {
-      Exchange exchange = endpoint.createExchange();
-      exchange.getIn().setBody(message);
-      getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
-      ++processCount;
+      for (String message : messages) {
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody(message);
+        getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
+        ++processCount;
+      }
+    } catch (Exception e) {
+      logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage());
     }
+
+    logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount));
+
     return processCount;
   }
   @Override
   protected void doStop() throws Exception {
     super.doStop();
     if (endpoint != null) {
-      endpoint.close();
+      endpoint.end();
     }
   }
   @Override
   protected void doShutdown() throws Exception {
     super.doShutdown();
     if (endpoint != null) {
-      endpoint.close();
+      endpoint.end();
     }
   }