Fix potential null pointers
[aai/champ.git] / champ-service / src / main / java / org / onap / champ / async / ChampAsyncRequestProcessor.java
index 346bd02..e091ee9 100644 (file)
@@ -269,55 +269,52 @@ public class ChampAsyncRequestProcessor extends TimerTask {
         if (asyncRequestConsumer == null) {
             logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
                     "Unable to initialize ChampAsyncRequestProcessor");
-        }
+        } else {
+            try {
+                Iterable<String> events = asyncRequestConsumer.consume();
+
+                if (events == null || !events.iterator().hasNext()) {
+                    logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
+                } else {
+                    processEvents(events);
+                }
 
-        Iterable<String> events = null;
-        try {
-            events = asyncRequestConsumer.consume();
-        } catch (Exception e) {
-            logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
-            return;
+                asyncRequestConsumer.commitOffsets();
+            } catch (OperationNotSupportedException e) {
+                // Dmaap doesnt support commit with offset
+                logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+            } catch (Exception e) {
+                logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+            }
         }
 
-        if (events == null || !events.iterator().hasNext()) {
-            logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
+    }
 
-        } else {
-            for (String event : events) {
-                try {
-                    GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
-                    GraphEvent requestEvent = requestEnvelope.getBody();
-                    auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+    private void processEvents(Iterable<String> events) {
+        for (String event : events) {
+            try {
+                GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
+                GraphEvent requestEvent = requestEnvelope.getBody();
+                auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
                         "Event received of type: " + requestEvent.getObjectType() + " with key: "
-                            + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
-                            + " , operation: " + requestEvent.getOperation().toString());
-                    logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+                                + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+                                + " , operation: " + requestEvent.getOperation().toString());
+                logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
                         "Event received of type: " + requestEvent.getObjectType() + " with key: "
-                            + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
-                            + " , operation: " + requestEvent.getOperation().toString());
-                    logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
+                                + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+                                + " , operation: " + requestEvent.getOperation().toString());
+                logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
 
-                    // Try to submit the event to be published to the event bus.
-                    if (!requestProcesserEventQueue.offer(requestEnvelope)) {
-                        logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+                // Try to submit the event to be published to the event bus.
+                if (!requestProcesserEventQueue.offer(requestEnvelope)) {
+                    logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
                             "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
-                    }
-
-                } catch (Exception e) {
-                    logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
                 }
-            }
-        }
 
-        try {
-            asyncRequestConsumer.commitOffsets();
-        } catch (OperationNotSupportedException e) {
-            // Dmaap doesnt support commit with offset
-            logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
-        } catch (Exception e) {
-            logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+            } catch (Exception e) {
+                logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
+            }
         }
-
     }
 
     public Integer getRequestPollingTimeSeconds() {