Fix Sonar bugs and possible NPE in code
[aai/spike.git] / src / main / java / org / onap / aai / spike / service / SpikeEventProcessor.java
index cd404b0..a18590a 100644 (file)
  */
 package org.onap.aai.spike.service;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import java.util.ArrayList;
 import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.PriorityBlockingQueue;
 import javax.naming.OperationNotSupportedException;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
 import org.onap.aai.event.api.EventConsumer;
@@ -40,7 +40,6 @@ import org.onap.aai.spike.event.incoming.OffsetManager;
 import org.onap.aai.spike.event.outgoing.SpikeEventComparator;
 import org.onap.aai.spike.event.outgoing.SpikeEventExclusionStrategy;
 import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
-import org.onap.aai.spike.exception.SpikeException;
 import org.onap.aai.spike.logging.SpikeMsgs;
 import org.onap.aai.spike.util.SpikeConstants;
 import org.onap.aai.spike.util.SpikeProperties;
@@ -90,7 +89,7 @@ public class SpikeEventProcessor extends TimerTask {
         } catch (Exception ex) {
         }
 
-        eventQueue = new PriorityBlockingQueue<SpikeGraphEvent>(eventQueueCapacity, new SpikeEventComparator());
+        eventQueue = new PriorityBlockingQueue<>(eventQueueCapacity, new SpikeEventComparator());
         new Thread(new SpikeEventPublisher()).start();
 
         // Instantiate the offset manager. This will run a background thread that
@@ -106,6 +105,7 @@ public class SpikeEventProcessor extends TimerTask {
 
         if (consumer == null) {
             logger.error(SpikeMsgs.SPIKE_SERVICE_STARTED_FAILURE, SpikeConstants.SPIKE_SERVICE_NAME);
+            return;
         }
 
         Iterable<MessageWithOffset> events = null;
@@ -164,9 +164,11 @@ public class SpikeEventProcessor extends TimerTask {
                         + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
                 logger.debug(SpikeMsgs.SPIKE_EVENT_PROCESSED, modelEventJson);
 
-            } catch (SpikeException | InterruptedException e) {
+            } catch (InterruptedException e) {
                 logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
                         e.getMessage() + ".  Incoming event payload:\n" + event.getMessage());
+                // Restore the interrupted status...
+                Thread.currentThread().interrupt();
             } catch (Exception e) {
                 logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
                         e.getMessage() + ".  Incoming event payload:\n" + event.getMessage());
@@ -252,9 +254,9 @@ public class SpikeEventProcessor extends TimerTask {
                     }
 
                 } catch (InterruptedException e) {
-
                     // Restore the interrupted status.
                     Thread.currentThread().interrupt();
+                    continue;
                 }
 
                 // Try publishing the event to the event bus. This call will block
@@ -288,7 +290,7 @@ public class SpikeEventProcessor extends TimerTask {
                     try {
                         Thread.sleep(60000);
                     } catch (InterruptedException e1) {
-                        e1.printStackTrace();
+                        Thread.currentThread().interrupt();
                     }
                 } catch (Exception e) {
                     logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, e.getMessage());