Merge "Reorder modifiers"
[so.git] / common / src / main / java / org / openecomp / mso / client / dmaap / DmaapConsumer.java
index 0339516..6a01fb6 100644 (file)
 
 package org.openecomp.mso.client.dmaap;
 
-import java.io.FileNotFoundException;
+import com.google.common.base.Stopwatch;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
-
 import org.openecomp.mso.client.dmaap.exceptions.DMaaPConsumerFailure;
 import org.openecomp.mso.client.dmaap.exceptions.ExceededMaximumPollingTime;
 import org.openecomp.mso.client.dmaap.rest.RestConsumer;
 
-import com.google.common.base.Stopwatch;
-
 public abstract class DmaapConsumer extends DmaapClient {
 
-       public DmaapConsumer() throws FileNotFoundException, IOException {
+       public DmaapConsumer() throws IOException {
                super("dmaap/default-consumer.properties");
        }
-       
-       public Consumer getConsumer() throws FileNotFoundException, IOException {
+
+       public Consumer getConsumer() {
                return new RestConsumer(this.properties);
        }
+
        public boolean consume() throws Exception {
-               
-               Consumer mrConsumer = this.getConsumer();
-               int iterations = 0;
+           Consumer mrConsumer = this.getConsumer();
                boolean accepted = false;
                Stopwatch stopwatch = Stopwatch.createUnstarted();
                try {
@@ -59,32 +55,28 @@ public abstract class DmaapConsumer extends DmaapClient {
                                        if (!accepted && this.isAccepted(message)) {
                                                auditLogger.info("accepted message found for " + this.getRequestId() + " on " + this.getTopic());
                                                accepted = true;
-                                       } 
+                                       }
                                        if (accepted) {
+                                               auditLogger.info("received dmaap message: " + message);
                                                if (this.isFailure(message)) {
                                                        this.stopProcessingMessages();
-                                                       auditLogger.info("received dmaap message: " + message);
                                                        final String errorMsg = "failure received from dmaap topic " + this.getTopic();
                                                        auditLogger.error(errorMsg);
                                                        throw new DMaaPConsumerFailure(errorMsg);
                                                } else {
-                                                       auditLogger.info("received dmaap message: " + message);
                                                        this.processMessage(message);
                                                }
                                        }
                                }
-                               iterations++;
                        }
                        return true;
-               } catch (Exception e ) {
-                       throw e;
                } finally {
                        if (stopwatch.isRunning()) {
                                stopwatch.stop();
                        }
                }
        }
-       
+
        /**
         * Should this consumer continue to consume messages from the topic?
         * @return
@@ -92,7 +84,7 @@ public abstract class DmaapConsumer extends DmaapClient {
        public abstract boolean continuePolling();
        /**
         * Process a message from a DMaaP topic
-        * 
+        *
         * @param message
         * @throws Exception
         */
@@ -100,14 +92,14 @@ public abstract class DmaapConsumer extends DmaapClient {
        /**
         * Has the request been accepted by the receiving system?
         * Should the consumer move to processing messages?
-        * 
+        *
         * @param message
         * @return
         */
        public abstract boolean isAccepted(String message);
        /**
         * has the request failed?
-        * 
+        *
         * @param message
         * @return
         */
@@ -121,11 +113,14 @@ public abstract class DmaapConsumer extends DmaapClient {
         * Logic that defines when the consumer should stop processing messages
         */
        public abstract void stopProcessingMessages();
-       
+
        /**
         * time in milliseconds
         */
        public int getMaximumElapsedTime() {
                return 180000;
        }
+
+
+
 }