Add logging into consumer
[aai/router-core.git] / src / main / java / org / onap / aai / event / EventBusConsumer.java
index e795e2c..da8ffb5 100644 (file)
@@ -31,6 +31,7 @@ import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
 import org.onap.aai.cl.mdc.MdcContext;
 
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -69,21 +70,27 @@ public class EventBusConsumer extends ScheduledPollConsumer {
   @Override
   protected int poll() throws Exception {
 
-    logger.debug("Checking for event on topic: " + endpoint.getEventTopic());
+    String topic = endpoint.getEventTopic();
 
-    int processCount = 0;
+    logger.debug("Checking for event on topic: " + topic);
 
-    //Iterable<String> messages = consumer.fetch();
-    Iterable<String> messages = consumer.consumeAndCommit();
+    int processCount = 0;
 
-    String topic = endpoint.getEventTopic();
+    try {
+      Iterable<String> messages = consumer.consumeAndCommit();
 
-    for (String message : messages) {
-      Exchange exchange = endpoint.createExchange();
-      exchange.getIn().setBody(message);
-      getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
-      ++processCount;
+      for (String message : messages) {
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody(message);
+        getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
+        ++processCount;
+      }
+    } catch (Exception e) {
+      logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage());
     }
+
+    logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount));
+
     return processCount;
   }
   @Override