import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
-
import javax.naming.OperationNotSupportedException;
import javax.ws.rs.core.Response.Status;
-
import org.onap.aai.champcore.ChampTransaction;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
import org.onap.champ.ChampRESTAPI;
import org.onap.champ.event.GraphEvent;
import org.onap.champ.event.GraphEvent.GraphEventResult;
+import org.onap.champ.event.envelope.GraphEventEnvelope;
+import org.onap.champ.event.envelope.GraphEventHeader;
import org.onap.champ.event.GraphEventEdge;
import org.onap.champ.event.GraphEventVertex;
import org.onap.champ.exception.ChampServiceException;
import org.onap.champ.service.ChampThreadFactory;
import org.onap.champ.service.logging.ChampMsgs;
-import org.onap.aai.event.api.EventConsumer;
-
/**
- * This Class polls the Graph events from request topic perform the necessary
- * CRUD operation by calling champDAO and queues up the response to be consumed
- * by response handler.
+ * This Class polls the Graph events from request topic perform the necessary CRUD operation by calling champDAO and
+ * queues up the response to be consumed by response handler.
*/
public class ChampAsyncRequestProcessor extends TimerTask {
- private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
+ private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
- private ChampDataService champDataService;
+ private ChampDataService champDataService;
- /**
- * Number of events that can be queued up.
- */
- private Integer requestProcesserQueueSize;
+ /**
+ * Number of events that can be queued up.
+ */
+ private Integer requestProcesserQueueSize;
- /**
- * Number of event publisher worker threads.
- */
- private Integer requestProcesserPoolSize;
-
- /**
- * Number of event publisher worker threads.
- */
- private Integer requestPollingTimeSeconds;
+ /**
+ * Number of event publisher worker threads.
+ */
+ private Integer requestProcesserPoolSize;
- /**
- * Internal queue where outgoing events will be buffered until they can be
- * serviced by.
- **/
- private BlockingQueue<GraphEvent> requestProcesserEventQueue;
+ /**
+ * Number of event publisher worker threads.
+ */
+ private Integer requestPollingTimeSeconds;
- /**
- * Pool of worker threads that do the work of publishing the events to the
- * event bus.
- */
- private ThreadPoolExecutor requestProcesserPool;
+ /**
+ * Internal queue where outgoing events will be buffered until they can be serviced by.
+ **/
+ private BlockingQueue<GraphEventEnvelope> requestProcesserEventQueue;
- private ChampAsyncResponsePublisher champAsyncResponsePublisher;
+ /**
+ * Pool of worker threads that do the work of publishing the events to the event bus.
+ */
+ private ThreadPoolExecutor requestProcesserPool;
- private EventConsumer asyncRequestConsumer;
+ private ChampAsyncResponsePublisher champAsyncResponsePublisher;
- private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
+ private EventConsumer asyncRequestConsumer;
- private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
- private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
- private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
- Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
+ private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
- public ChampAsyncRequestProcessor(ChampDataService champDataService,
- ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
+ private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
+ private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
+ private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
+ Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
- this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
+ public ChampAsyncRequestProcessor(ChampDataService champDataService,
+ ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
- this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
+ this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
- this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
- requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
- requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
- new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
+ this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
- for (int i = 0; i < requestProcesserPoolSize; i++) {
- requestProcesserPool.submit(new ChampProcessorWorker());
+ this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
+ requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize);
+ requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
+ new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
+
+ for (int i = 0; i < requestProcesserPoolSize; i++) {
+ requestProcesserPool.submit(new ChampProcessorWorker());
+ }
+
+ this.champDataService = champDataService;
+ this.champAsyncResponsePublisher = champAsyncResponsePublisher;
+ this.asyncRequestConsumer = asyncRequestConsumer;
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
+ + asyncRequestConsumer.getClass().getName());
}
- this.champDataService = champDataService;
- this.champAsyncResponsePublisher = champAsyncResponsePublisher;
- this.asyncRequestConsumer = asyncRequestConsumer;
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
- + asyncRequestConsumer.getClass().getName());
- }
-
-
+ public ChampAsyncRequestProcessor(ChampDataService champDataService,
+ ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
+ Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
+
+ this.requestProcesserQueueSize = requestProcesserQueueSize;
- public ChampAsyncRequestProcessor(ChampDataService champDataService,
- ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
- Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
+ this.requestProcesserPoolSize = requestProcesserPoolSize;
- this.requestProcesserQueueSize = requestProcesserQueueSize;
+ this.requestPollingTimeSeconds = requestPollingTimeSeconds;
- this.requestProcesserPoolSize = requestProcesserPoolSize;
-
- this.requestPollingTimeSeconds = requestPollingTimeSeconds;
+ requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize);
+ requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
+ new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
- requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
- requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
- new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
+ for (int i = 0; i < requestProcesserPoolSize; i++) {
+ requestProcesserPool.submit(new ChampProcessorWorker());
+ }
- for (int i = 0; i < requestProcesserPoolSize; i++) {
- requestProcesserPool.submit(new ChampProcessorWorker());
+ this.champDataService = champDataService;
+ this.champAsyncResponsePublisher = champAsyncResponsePublisher;
+ this.asyncRequestConsumer = asyncRequestConsumer;
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
+ + asyncRequestConsumer.getClass().getName());
}
- this.champDataService = champDataService;
- this.champAsyncResponsePublisher = champAsyncResponsePublisher;
- this.asyncRequestConsumer = asyncRequestConsumer;
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
- + asyncRequestConsumer.getClass().getName());
- }
+ private class ChampProcessorWorker implements Runnable {
+
+ @Override
+ public void run() {
+
+ while (true) {
+
+ GraphEventEnvelope eventEnvelope = null;
+ GraphEvent event = null;
+ try {
+ // Get the next event to be published from the queue.
+ eventEnvelope = requestProcesserEventQueue.take();
+ event = eventEnvelope.getBody();
+ } catch (InterruptedException e) {
+ // Restore the interrupted status.
+ Thread.currentThread().interrupt();
+ }
+
+ // Apply Champ Event header
+ eventEnvelope.setHeader(GraphEventHeader.builder().requestId(event.getTransactionId()).build());
+
+ // Parse the event and call champ Dao to process , Create the
+ // response event and put it on response queue
+ event.setResult(GraphEventResult.SUCCESS);
+
+ // Check if this request is part of an ongoing DB transaction
+ ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
+ if ((event.getDbTransactionId() != null) && (transaction == null)) {
+ event.setResult(GraphEventResult.FAILURE);
+ event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
+ event.setHttpErrorStatus(Status.BAD_REQUEST);
+ }
+
+ if (event.getResult() != GraphEventResult.FAILURE) {
+ try {
+ if (event.getVertex() != null) {
+
+ switch (event.getOperation()) {
+ case CREATE:
+ event.setVertex(GraphEventVertex.fromChampObject(
+ champDataService.storeObject(event.getVertex().toChampObject(),
+ Optional.ofNullable(transaction)),
+ event.getVertex().getModelVersion()));
+ break;
+
+ case UPDATE:
+ event.setVertex(GraphEventVertex.fromChampObject(
+ champDataService.replaceObject(event.getVertex().toChampObject(),
+ event.getVertex().getId(), Optional.ofNullable(transaction)),
+ event.getVertex().getModelVersion()));
+ break;
+ case DELETE:
+ champDataService.deleteObject(event.getVertex().getId(),
+ Optional.ofNullable(transaction));
+ break;
+ default:
+ // log error
+ }
+ } else if (event.getEdge() != null) {
+ switch (event.getOperation()) {
+ case CREATE:
+ event.setEdge(GraphEventEdge.fromChampRelationship(
+ champDataService.storeRelationship(event.getEdge().toChampRelationship(),
+ Optional.ofNullable(transaction)),
+ event.getEdge().getModelVersion()));
+ break;
+
+ case UPDATE:
+ event.setEdge(GraphEventEdge.fromChampRelationship(
+ champDataService.updateRelationship(event.getEdge().toChampRelationship(),
+ event.getEdge().getId(), Optional.ofNullable(transaction)),
+ event.getEdge().getModelVersion()));
+
+ break;
+ case DELETE:
+ champDataService.deleteRelationship(event.getEdge().getId(),
+ Optional.ofNullable(transaction));
+ break;
+ default:
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ "Invalid operation for event transactionId: " + event.getTransactionId());
+ }
+
+ } else {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ "Invalid payload for event transactionId: " + event.getTransactionId());
+ }
+ } catch (ChampServiceException champException) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
+ event.setResult(GraphEventResult.FAILURE);
+ event.setErrorMessage(champException.getMessage());
+ event.setHttpErrorStatus(champException.getHttpStatus());
+
+ } catch (Exception ex) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
+ event.setResult(GraphEventResult.FAILURE);
+ event.setErrorMessage(ex.getMessage());
+ event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ if (event.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString() + " , result: " + event.getResult());
+ } else {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
+ + event.getErrorMessage());
+ }
+
+ champAsyncResponsePublisher.publishResponseEvent(eventEnvelope);
- private class ChampProcessorWorker implements Runnable {
+ }
+ }
+ }
@Override
public void run() {
- while (true) {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
- GraphEvent event = null;
- try {
- // Get the next event to be published from the queue.
- event = requestProcesserEventQueue.take();
- } catch (InterruptedException e) {
- // Restore the interrupted status.
- Thread.currentThread().interrupt();
- }
-
- // Parse the event and call champ Dao to process , Create the
- // response event and put it on response queue
- event.setResult(GraphEventResult.SUCCESS);
-
- // Check if this request is part of an ongoing DB transaction
- ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
- if ( (event.getDbTransactionId() != null) && (transaction == null) ) {
- event.setResult(GraphEventResult.FAILURE);
- event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
- event.setHttpErrorStatus(Status.BAD_REQUEST);
- }
-
- if (event.getResult() != GraphEventResult.FAILURE) {
- try {
- if (event.getVertex() != null) {
-
- switch (event.getOperation()) {
- case CREATE:
- event.setVertex(GraphEventVertex.fromChampObject(
- champDataService.storeObject(event.getVertex().toChampObject(), Optional.ofNullable(transaction)),
- event.getVertex().getModelVersion()));
- break;
-
- case UPDATE:
- event.setVertex(GraphEventVertex.fromChampObject(
- champDataService.replaceObject(event.getVertex().toChampObject(), event.getVertex().getId(), Optional.ofNullable(transaction)),
- event.getVertex().getModelVersion()));
- break;
- case DELETE:
- champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction));
- break;
- default:
- // log error
- }
- } else if (event.getEdge() != null) {
- switch (event.getOperation()) {
- case CREATE:
- event.setEdge(GraphEventEdge.fromChampRelationship(
- champDataService.storeRelationship(event.getEdge().toChampRelationship(), Optional.ofNullable(transaction)),
- event.getEdge().getModelVersion()));
- break;
-
- case UPDATE:
- event.setEdge(GraphEventEdge.fromChampRelationship(champDataService
- .updateRelationship(event.getEdge().toChampRelationship(), event.getEdge().getId(), Optional.ofNullable(transaction)),
- event.getEdge().getModelVersion()));
-
- break;
- case DELETE:
- champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction));
- break;
- default:
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
- "Invalid operation for event transactionId: " + event.getTransactionId());
- }
-
- } else {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
- "Invalid payload for event transactionId: " + event.getTransactionId());
- }
- } catch (ChampServiceException champException) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
- event.setResult(GraphEventResult.FAILURE);
- event.setErrorMessage(champException.getMessage());
- event.setHttpErrorStatus(champException.getHttpStatus());
-
- } catch (Exception ex) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
- event.setResult(GraphEventResult.FAILURE);
- event.setErrorMessage(ex.getMessage());
- event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
- }
+ if (asyncRequestConsumer == null) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ "Unable to initialize ChampAsyncRequestProcessor");
}
- if (event.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString() + " , result: " + event.getResult());
- } else {
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
- + event.getErrorMessage());
+ Iterable<String> events = null;
+ try {
+ events = asyncRequestConsumer.consume();
+ } catch (Exception e) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
+ return;
}
- champAsyncResponsePublisher.publishResponseEvent(event);
-
- }
- }
- }
-
- @Override
- public void run() {
-
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
+ if (events == null || !events.iterator().hasNext()) {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
- if (asyncRequestConsumer == null) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Unable to initialize ChampAsyncRequestProcessor");
- }
-
- Iterable<String> events = null;
- try {
- events = asyncRequestConsumer.consume();
- } catch (Exception e) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
- return;
- }
-
- if (events == null || !events.iterator().hasNext()) {
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
+ }
- }
+ for (String event : events) {
+ try {
+ GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
+ GraphEvent requestEvent = requestEnvelope.getBody();
+ auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "Event received of type: " + requestEvent.getObjectType() + " with key: "
+ + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+ + " , operation: " + requestEvent.getOperation().toString());
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ "Event received of type: " + requestEvent.getObjectType() + " with key: "
+ + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+ + " , operation: " + requestEvent.getOperation().toString());
+ logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
+
+ // Try to submit the event to be published to the event bus.
+ if (!requestProcesserEventQueue.offer(requestEnvelope)) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
+ }
+
+ } catch (Exception e) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
+ }
+ }
- for (String event : events) {
- try {
- GraphEvent requestEvent = GraphEvent.fromJson(event);
- auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
- + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
- + requestEvent.getOperation().toString());
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
- "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
- + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
- + requestEvent.getOperation().toString());
- logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
-
- // Try to submit the event to be published to the event bus.
- if (!requestProcesserEventQueue.offer(requestEvent)) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
- "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
+ try {
+ asyncRequestConsumer.commitOffsets();
+ } catch (OperationNotSupportedException e) {
+ // Dmaap doesnt support commit with offset
+ logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+ } catch (Exception e) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
}
- } catch (Exception e) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
- }
}
- try {
- asyncRequestConsumer.commitOffsets();
- } catch(OperationNotSupportedException e) {
- //Dmaap doesnt support commit with offset
- logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
- }
- catch (Exception e) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+ public Integer getRequestPollingTimeSeconds() {
+ return requestPollingTimeSeconds;
}
- }
-
-
-
- public Integer getRequestPollingTimeSeconds() {
- return requestPollingTimeSeconds;
- }
-
-
}