Update to consume and publish events in new format
[aai/champ.git] / champ-service / src / main / java / org / onap / champ / async / ChampAsyncRequestProcessor.java
index 610cac9..334871e 100644 (file)
@@ -26,16 +26,17 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
-
 import javax.naming.OperationNotSupportedException;
 import javax.ws.rs.core.Response.Status;
-
 import org.onap.aai.champcore.ChampTransaction;
 import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
 import org.onap.champ.ChampRESTAPI;
 import org.onap.champ.event.GraphEvent;
 import org.onap.champ.event.GraphEvent.GraphEventResult;
+import org.onap.champ.event.envelope.GraphEventEnvelope;
+import org.onap.champ.event.envelope.GraphEventHeader;
 import org.onap.champ.event.GraphEventEdge;
 import org.onap.champ.event.GraphEventVertex;
 import org.onap.champ.exception.ChampServiceException;
@@ -43,281 +44,283 @@ import org.onap.champ.service.ChampDataService;
 import org.onap.champ.service.ChampThreadFactory;
 import org.onap.champ.service.logging.ChampMsgs;
 
-import org.onap.aai.event.api.EventConsumer;
-
 /**
- * This Class polls the Graph events from request topic perform the necessary
- * CRUD operation by calling champDAO and queues up the response to be consumed
- * by response handler.
+ * This Class polls the Graph events from request topic perform the necessary CRUD operation by calling champDAO and
+ * queues up the response to be consumed by response handler.
  */
 public class ChampAsyncRequestProcessor extends TimerTask {
 
-  private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
+    private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
 
-  private ChampDataService champDataService;
+    private ChampDataService champDataService;
 
-  /**
-   * Number of events that can be queued up.
-   */
-  private Integer requestProcesserQueueSize;
+    /**
+     * Number of events that can be queued up.
+     */
+    private Integer requestProcesserQueueSize;
 
-  /**
-   * Number of event publisher worker threads.
-   */
-  private Integer requestProcesserPoolSize;
-  
-  /**
-   * Number of event publisher worker threads.
-   */
-  private Integer requestPollingTimeSeconds;
+    /**
+     * Number of event publisher worker threads.
+     */
+    private Integer requestProcesserPoolSize;
 
-  /**
-   * Internal queue where outgoing events will be buffered until they can be
-   * serviced by.
-   **/
-  private BlockingQueue<GraphEvent> requestProcesserEventQueue;
+    /**
+     * Number of event publisher worker threads.
+     */
+    private Integer requestPollingTimeSeconds;
 
-  /**
-   * Pool of worker threads that do the work of publishing the events to the
-   * event bus.
-   */
-  private ThreadPoolExecutor requestProcesserPool;
+    /**
+     * Internal queue where outgoing events will be buffered until they can be serviced by.
+     **/
+    private BlockingQueue<GraphEventEnvelope> requestProcesserEventQueue;
 
-  private ChampAsyncResponsePublisher champAsyncResponsePublisher;
+    /**
+     * Pool of worker threads that do the work of publishing the events to the event bus.
+     */
+    private ThreadPoolExecutor requestProcesserPool;
 
-  private EventConsumer asyncRequestConsumer;
+    private ChampAsyncResponsePublisher champAsyncResponsePublisher;
 
-  private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
+    private EventConsumer asyncRequestConsumer;
 
-  private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
-  private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
-  private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
-  Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
+    private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
 
-  public ChampAsyncRequestProcessor(ChampDataService champDataService,
-      ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
+    private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
+    private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
+    private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
+    Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
 
-    this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
+    public ChampAsyncRequestProcessor(ChampDataService champDataService,
+            ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
 
-    this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
+        this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
 
-    this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
-    requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
-    requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
-        new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
+        this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
 
-    for (int i = 0; i < requestProcesserPoolSize; i++) {
-      requestProcesserPool.submit(new ChampProcessorWorker());
+        this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
+        requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize);
+        requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
+                new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
+
+        for (int i = 0; i < requestProcesserPoolSize; i++) {
+            requestProcesserPool.submit(new ChampProcessorWorker());
+        }
+
+        this.champDataService = champDataService;
+        this.champAsyncResponsePublisher = champAsyncResponsePublisher;
+        this.asyncRequestConsumer = asyncRequestConsumer;
+        logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+                "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
+                        + asyncRequestConsumer.getClass().getName());
     }
 
-    this.champDataService = champDataService;
-    this.champAsyncResponsePublisher = champAsyncResponsePublisher;
-    this.asyncRequestConsumer = asyncRequestConsumer;
-    logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
-        "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
-            + asyncRequestConsumer.getClass().getName());
-  }
-  
-  
+    public ChampAsyncRequestProcessor(ChampDataService champDataService,
+            ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
+            Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
+
+        this.requestProcesserQueueSize = requestProcesserQueueSize;
 
-  public ChampAsyncRequestProcessor(ChampDataService champDataService,
-      ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
-      Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
+        this.requestProcesserPoolSize = requestProcesserPoolSize;
 
-    this.requestProcesserQueueSize = requestProcesserQueueSize;
+        this.requestPollingTimeSeconds = requestPollingTimeSeconds;
 
-    this.requestProcesserPoolSize = requestProcesserPoolSize;
-    
-    this.requestPollingTimeSeconds = requestPollingTimeSeconds;
+        requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize);
+        requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
+                new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
 
-    requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
-    requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
-        new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
+        for (int i = 0; i < requestProcesserPoolSize; i++) {
+            requestProcesserPool.submit(new ChampProcessorWorker());
+        }
 
-    for (int i = 0; i < requestProcesserPoolSize; i++) {
-      requestProcesserPool.submit(new ChampProcessorWorker());
+        this.champDataService = champDataService;
+        this.champAsyncResponsePublisher = champAsyncResponsePublisher;
+        this.asyncRequestConsumer = asyncRequestConsumer;
+        logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+                "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
+                        + asyncRequestConsumer.getClass().getName());
     }
 
-    this.champDataService = champDataService;
-    this.champAsyncResponsePublisher = champAsyncResponsePublisher;
-    this.asyncRequestConsumer = asyncRequestConsumer;
-    logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
-        "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
-            + asyncRequestConsumer.getClass().getName());
-  }
+    private class ChampProcessorWorker implements Runnable {
+
+        @Override
+        public void run() {
+
+            while (true) {
+
+                GraphEventEnvelope eventEnvelope = null;
+                GraphEvent event = null;
+                try {
+                    // Get the next event to be published from the queue.
+                    eventEnvelope = requestProcesserEventQueue.take();
+                    event = eventEnvelope.getBody();
+                } catch (InterruptedException e) {
+                    // Restore the interrupted status.
+                    Thread.currentThread().interrupt();
+                }
+
+                // Apply Champ Event header
+                eventEnvelope.setHeader(GraphEventHeader.builder().requestId(event.getTransactionId()).build());
+
+                // Parse the event and call champ Dao to process , Create the
+                // response event and put it on response queue
+                event.setResult(GraphEventResult.SUCCESS);
+
+                // Check if this request is part of an ongoing DB transaction
+                ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
+                if ((event.getDbTransactionId() != null) && (transaction == null)) {
+                    event.setResult(GraphEventResult.FAILURE);
+                    event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
+                    event.setHttpErrorStatus(Status.BAD_REQUEST);
+                }
+
+                if (event.getResult() != GraphEventResult.FAILURE) {
+                    try {
+                        if (event.getVertex() != null) {
+
+                            switch (event.getOperation()) {
+                                case CREATE:
+                                    event.setVertex(GraphEventVertex.fromChampObject(
+                                            champDataService.storeObject(event.getVertex().toChampObject(),
+                                                    Optional.ofNullable(transaction)),
+                                            event.getVertex().getModelVersion()));
+                                    break;
+
+                                case UPDATE:
+                                    event.setVertex(GraphEventVertex.fromChampObject(
+                                            champDataService.replaceObject(event.getVertex().toChampObject(),
+                                                    event.getVertex().getId(), Optional.ofNullable(transaction)),
+                                            event.getVertex().getModelVersion()));
+                                    break;
+                                case DELETE:
+                                    champDataService.deleteObject(event.getVertex().getId(),
+                                            Optional.ofNullable(transaction));
+                                    break;
+                                default:
+                                    // log error
+                            }
+                        } else if (event.getEdge() != null) {
+                            switch (event.getOperation()) {
+                                case CREATE:
+                                    event.setEdge(GraphEventEdge.fromChampRelationship(
+                                            champDataService.storeRelationship(event.getEdge().toChampRelationship(),
+                                                    Optional.ofNullable(transaction)),
+                                            event.getEdge().getModelVersion()));
+                                    break;
+
+                                case UPDATE:
+                                    event.setEdge(GraphEventEdge.fromChampRelationship(
+                                            champDataService.updateRelationship(event.getEdge().toChampRelationship(),
+                                                    event.getEdge().getId(), Optional.ofNullable(transaction)),
+                                            event.getEdge().getModelVersion()));
+
+                                    break;
+                                case DELETE:
+                                    champDataService.deleteRelationship(event.getEdge().getId(),
+                                            Optional.ofNullable(transaction));
+                                    break;
+                                default:
+                                    logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+                                            "Invalid operation for event transactionId: " + event.getTransactionId());
+                            }
+
+                        } else {
+                            logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+                                    "Invalid payload for event transactionId: " + event.getTransactionId());
+                        }
+                    } catch (ChampServiceException champException) {
+                        logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
+                        event.setResult(GraphEventResult.FAILURE);
+                        event.setErrorMessage(champException.getMessage());
+                        event.setHttpErrorStatus(champException.getHttpStatus());
+
+                    } catch (Exception ex) {
+                        logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
+                        event.setResult(GraphEventResult.FAILURE);
+                        event.setErrorMessage(ex.getMessage());
+                        event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
+                    }
+                }
+
+                if (event.getResult().equals(GraphEventResult.SUCCESS)) {
+                    logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+                            "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
+                                    + " , transaction-id: " + event.getTransactionId() + " , operation: "
+                                    + event.getOperation().toString() + " , result: " + event.getResult());
+                } else {
+                    logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+                            "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
+                                    + " , transaction-id: " + event.getTransactionId() + " , operation: "
+                                    + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
+                                    + event.getErrorMessage());
+                }
+
+                champAsyncResponsePublisher.publishResponseEvent(eventEnvelope);
 
-  private class ChampProcessorWorker implements Runnable {
+            }
+        }
+    }
 
     @Override
     public void run() {
 
-      while (true) {
+        logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
 
-        GraphEvent event = null;
-        try {
-          // Get the next event to be published from the queue.
-          event = requestProcesserEventQueue.take();
-        } catch (InterruptedException e) {
-          // Restore the interrupted status.
-          Thread.currentThread().interrupt();
-        }
-
-        // Parse the event and call champ Dao to process , Create the
-        // response event and put it on response queue
-        event.setResult(GraphEventResult.SUCCESS);
-        
-        // Check if this request is part of an ongoing DB transaction
-        ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
-        if ( (event.getDbTransactionId() != null) && (transaction == null) ) {
-          event.setResult(GraphEventResult.FAILURE);
-          event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
-          event.setHttpErrorStatus(Status.BAD_REQUEST);
-        }
-        
-        if (event.getResult() != GraphEventResult.FAILURE) {
-          try {
-            if (event.getVertex() != null) {
-
-              switch (event.getOperation()) {
-              case CREATE:
-                event.setVertex(GraphEventVertex.fromChampObject(
-                    champDataService.storeObject(event.getVertex().toChampObject(), Optional.ofNullable(transaction)),
-                    event.getVertex().getModelVersion()));
-                break;
-
-              case UPDATE:
-                event.setVertex(GraphEventVertex.fromChampObject(
-                    champDataService.replaceObject(event.getVertex().toChampObject(), event.getVertex().getId(), Optional.ofNullable(transaction)),
-                    event.getVertex().getModelVersion()));
-                break;
-              case DELETE:
-                champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction));
-                break;
-              default:
-                // log error
-              }
-            } else if (event.getEdge() != null) {
-              switch (event.getOperation()) {
-              case CREATE:
-                event.setEdge(GraphEventEdge.fromChampRelationship(
-                    champDataService.storeRelationship(event.getEdge().toChampRelationship(), Optional.ofNullable(transaction)),
-                    event.getEdge().getModelVersion()));
-                break;
-
-              case UPDATE:
-                event.setEdge(GraphEventEdge.fromChampRelationship(champDataService
-                    .updateRelationship(event.getEdge().toChampRelationship(), event.getEdge().getId(), Optional.ofNullable(transaction)),
-                    event.getEdge().getModelVersion()));
-
-                break;
-              case DELETE:
-                champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction));
-                break;
-              default:
-                logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
-                    "Invalid operation for event transactionId: " + event.getTransactionId());
-              }
-
-            } else {
-              logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
-                  "Invalid payload for event transactionId: " + event.getTransactionId());
-            }
-          } catch (ChampServiceException champException) {
-            logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
-            event.setResult(GraphEventResult.FAILURE);
-            event.setErrorMessage(champException.getMessage());
-            event.setHttpErrorStatus(champException.getHttpStatus());
-
-          } catch (Exception ex) {
-            logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
-            event.setResult(GraphEventResult.FAILURE);
-            event.setErrorMessage(ex.getMessage());
-            event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
-          }
+        if (asyncRequestConsumer == null) {
+            logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+                    "Unable to initialize ChampAsyncRequestProcessor");
         }
 
-        if (event.getResult().equals(GraphEventResult.SUCCESS)) {
-          logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
-              "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
-                  + " , transaction-id: " + event.getTransactionId() + " , operation: "
-                  + event.getOperation().toString() + " , result: " + event.getResult());
-        } else {
-          logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
-              "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
-                  + " , transaction-id: " + event.getTransactionId() + " , operation: "
-                  + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
-                  + event.getErrorMessage());
+        Iterable<String> events = null;
+        try {
+            events = asyncRequestConsumer.consume();
+        } catch (Exception e) {
+            logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
+            return;
         }
 
-        champAsyncResponsePublisher.publishResponseEvent(event);
-
-      }
-    }
-  }
-
-  @Override
-  public void run() {
-
-    logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
+        if (events == null || !events.iterator().hasNext()) {
+            logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
 
-    if (asyncRequestConsumer == null) {
-      logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Unable to initialize ChampAsyncRequestProcessor");
-    }
-
-    Iterable<String> events = null;
-    try {
-      events = asyncRequestConsumer.consume();
-    } catch (Exception e) {
-      logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
-      return;
-    }
-
-    if (events == null || !events.iterator().hasNext()) {
-      logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
+        }
 
-    }
+        for (String event : events) {
+            try {
+                GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
+                GraphEvent requestEvent = requestEnvelope.getBody();
+                auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+                        "Event received of type: " + requestEvent.getObjectType() + " with key: "
+                                + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+                                + " , operation: " + requestEvent.getOperation().toString());
+                logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+                        "Event received of type: " + requestEvent.getObjectType() + " with key: "
+                                + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+                                + " , operation: " + requestEvent.getOperation().toString());
+                logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
+
+                // Try to submit the event to be published to the event bus.
+                if (!requestProcesserEventQueue.offer(requestEnvelope)) {
+                    logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+                            "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
+                }
+
+            } catch (Exception e) {
+                logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
+            }
+        }
 
-    for (String event : events) {
-      try {
-        GraphEvent requestEvent = GraphEvent.fromJson(event);
-        auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
-            "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
-                + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
-                + requestEvent.getOperation().toString());
-        logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
-            "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
-                + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
-                + requestEvent.getOperation().toString());
-        logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
-
-        // Try to submit the event to be published to the event bus.
-        if (!requestProcesserEventQueue.offer(requestEvent)) {
-          logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
-              "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
+        try {
+            asyncRequestConsumer.commitOffsets();
+        } catch (OperationNotSupportedException e) {
+            // Dmaap doesnt support commit with offset
+            logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+        } catch (Exception e) {
+            logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
         }
 
-      } catch (Exception e) {
-        logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
-      }
     }
 
-    try {
-      asyncRequestConsumer.commitOffsets();
-    } catch(OperationNotSupportedException e) {
-        //Dmaap doesnt support commit with offset      
-        logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
-    } 
-    catch (Exception e) {
-      logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+    public Integer getRequestPollingTimeSeconds() {
+        return requestPollingTimeSeconds;
     }
 
-  }
-
-
-
-  public Integer getRequestPollingTimeSeconds() {
-    return requestPollingTimeSeconds;
-  }
-
-  
 }