Add logging into consumer 21/49321/1
authorDaniel Silverthorn <daniel.silverthorn@amdocs.com>
Mon, 28 May 2018 21:11:50 +0000 (17:11 -0400)
committerDaniel Silverthorn <daniel.silverthorn@amdocs.com>
Mon, 28 May 2018 21:13:30 +0000 (17:13 -0400)
Change-Id: I0b5c045bac485523c4d1353d81a467c3c8ba918f
Issue-ID: AAI-1182
Signed-off-by: Daniel Silverthorn <daniel.silverthorn@amdocs.com>
src/main/java/org/onap/aai/event/EventBusConsumer.java

index e795e2c..da8ffb5 100644 (file)
@@ -31,6 +31,7 @@ import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
 import org.onap.aai.cl.mdc.MdcContext;
 
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -69,21 +70,27 @@ public class EventBusConsumer extends ScheduledPollConsumer {
   @Override
   protected int poll() throws Exception {
 
-    logger.debug("Checking for event on topic: " + endpoint.getEventTopic());
+    String topic = endpoint.getEventTopic();
 
-    int processCount = 0;
+    logger.debug("Checking for event on topic: " + topic);
 
-    //Iterable<String> messages = consumer.fetch();
-    Iterable<String> messages = consumer.consumeAndCommit();
+    int processCount = 0;
 
-    String topic = endpoint.getEventTopic();
+    try {
+      Iterable<String> messages = consumer.consumeAndCommit();
 
-    for (String message : messages) {
-      Exchange exchange = endpoint.createExchange();
-      exchange.getIn().setBody(message);
-      getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
-      ++processCount;
+      for (String message : messages) {
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody(message);
+        getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
+        ++processCount;
+      }
+    } catch (Exception e) {
+      logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage());
     }
+
+    logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount));
+
     return processCount;
   }
   @Override