Replaced all tabs with spaces in java and pom.xml
[so.git] / common / src / main / java / org / onap / so / client / dmaap / DmaapConsumer.java
index c9acdd7..3dd0c75 100644 (file)
@@ -29,97 +29,104 @@ import org.onap.so.client.dmaap.rest.RestConsumer;
 
 public abstract class DmaapConsumer extends DmaapClient {
 
-       public DmaapConsumer() throws IOException {
-               super("dmaap/default-consumer.properties");
-       }
-
-       public Consumer getConsumer() {
-               return new RestConsumer(this.properties);
-       }
-
-       public boolean consume() throws Exception {
-           Consumer mrConsumer = this.getConsumer();
-               boolean accepted = false;
-               Stopwatch stopwatch = Stopwatch.createUnstarted();
-               try {
-                       while (this.continuePolling()) {
-                               if (stopwatch.elapsed(TimeUnit.MILLISECONDS) >= this.getMaximumElapsedTime()) {
-                                       final String message = "exceeded maximum retries on " + this.getRequestId() + " on " + this.getTopic();
-                                       logger.error(message);
-                                       throw new ExceededMaximumPollingTime(message);
-                               }
-                               stopwatch.start();
-                               Iterable<String> itr = mrConsumer.fetch();
-                               stopwatch.stop();
-                               for (String message : itr) {
-                                       if (!accepted && this.isAccepted(message)) {
-                                               logger.info("accepted message found for " + this.getRequestId() + " on " + this.getTopic());
-                                               accepted = true;
-                                       }
-                                       if (accepted) {
-                                               logger.info("received dmaap message: " + message);
-                                               if (this.isFailure(message)) {
-                                                       this.stopProcessingMessages();
-                                                       final String errorMsg = "failure received from dmaap topic " + this.getTopic();
-                                                       logger.error(errorMsg);
-                                                       throw new DMaaPConsumerFailure(errorMsg);
-                                               } else {
-                                                       this.processMessage(message);
-                                               }
-                                       }
-                               }
-                       }
-                       return true;
-               } finally {
-                       if (stopwatch.isRunning()) {
-                               stopwatch.stop();
-                       }
-               }
-       }
-
-       /**
-        * Should this consumer continue to consume messages from the topic?
-        * @return
-        */
-       public abstract boolean continuePolling();
-       /**
-        * Process a message from a DMaaP topic
-        *
-        * @param message
-        * @throws Exception
-        */
-       public abstract void processMessage(String message) throws Exception;
-       /**
-        * Has the request been accepted by the receiving system?
-        * Should the consumer move to processing messages?
-        *
-        * @param message
-        * @return
-        */
-       public abstract boolean isAccepted(String message);
-       /**
-        * has the request failed?
-        *
-        * @param message
-        * @return
-        */
-       public abstract boolean isFailure(String message);
-       /**
-        * The request id to filter messages on
-        * @return
-        */
-       public abstract String getRequestId();
-       /**
-        * Logic that defines when the consumer should stop processing messages
-        */
-       public abstract void stopProcessingMessages();
-
-       /**
-        * time in milliseconds
-        */
-       public int getMaximumElapsedTime() {
-               return 180000;
-       }
+    public DmaapConsumer() throws IOException {
+        super("dmaap/default-consumer.properties");
+    }
+
+    public Consumer getConsumer() {
+        return new RestConsumer(this.properties);
+    }
+
+    public boolean consume() throws Exception {
+        Consumer mrConsumer = this.getConsumer();
+        boolean accepted = false;
+        Stopwatch stopwatch = Stopwatch.createUnstarted();
+        try {
+            while (this.continuePolling()) {
+                if (stopwatch.elapsed(TimeUnit.MILLISECONDS) >= this.getMaximumElapsedTime()) {
+                    final String message =
+                            "exceeded maximum retries on " + this.getRequestId() + " on " + this.getTopic();
+                    logger.error(message);
+                    throw new ExceededMaximumPollingTime(message);
+                }
+                stopwatch.start();
+                Iterable<String> itr = mrConsumer.fetch();
+                stopwatch.stop();
+                for (String message : itr) {
+                    if (!accepted && this.isAccepted(message)) {
+                        logger.info("accepted message found for " + this.getRequestId() + " on " + this.getTopic());
+                        accepted = true;
+                    }
+                    if (accepted) {
+                        logger.info("received dmaap message: " + message);
+                        if (this.isFailure(message)) {
+                            this.stopProcessingMessages();
+                            final String errorMsg = "failure received from dmaap topic " + this.getTopic();
+                            logger.error(errorMsg);
+                            throw new DMaaPConsumerFailure(errorMsg);
+                        } else {
+                            this.processMessage(message);
+                        }
+                    }
+                }
+            }
+            return true;
+        } finally {
+            if (stopwatch.isRunning()) {
+                stopwatch.stop();
+            }
+        }
+    }
+
+    /**
+     * Should this consumer continue to consume messages from the topic?
+     * 
+     * @return
+     */
+    public abstract boolean continuePolling();
+
+    /**
+     * Process a message from a DMaaP topic
+     *
+     * @param message
+     * @throws Exception
+     */
+    public abstract void processMessage(String message) throws Exception;
+
+    /**
+     * Has the request been accepted by the receiving system? Should the consumer move to processing messages?
+     *
+     * @param message
+     * @return
+     */
+    public abstract boolean isAccepted(String message);
+
+    /**
+     * has the request failed?
+     *
+     * @param message
+     * @return
+     */
+    public abstract boolean isFailure(String message);
+
+    /**
+     * The request id to filter messages on
+     * 
+     * @return
+     */
+    public abstract String getRequestId();
+
+    /**
+     * Logic that defines when the consumer should stop processing messages
+     */
+    public abstract void stopProcessingMessages();
+
+    /**
+     * time in milliseconds
+     */
+    public int getMaximumElapsedTime() {
+        return 180000;
+    }