package org.onap.so.client.dmaap;
-import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.onap.so.client.dmaap.exceptions.DMaaPConsumerFailure;
import org.onap.so.client.dmaap.exceptions.ExceededMaximumPollingTime;
import org.onap.so.client.dmaap.rest.RestConsumer;
+import com.google.common.base.Stopwatch;
public abstract class DmaapConsumer extends DmaapClient {
static final int MAX_ELAPSED_TIME = 180000;
public boolean consume() throws Exception {
Consumer mrConsumer = this.getConsumer();
- boolean accepted = false;
Stopwatch stopwatch = Stopwatch.createUnstarted();
try {
while (this.continuePolling()) {
Iterable<String> itr = mrConsumer.fetch();
stopwatch.stop();
for (String message : itr) {
- if (!accepted && this.isAccepted(message)) {
+ if (this.isAccepted(message)) {
logger.info("accepted message found for " + this.getRequestId() + " on " + this.getTopic());
- accepted = true;
}
- if (accepted) {
- logger.info("received dmaap message: " + message);
- if (this.isFailure(message)) {
- this.stopProcessingMessages();
- final String errorMsg = "failure received from dmaap topic " + this.getTopic();
- logger.error(errorMsg);
- throw new DMaaPConsumerFailure(errorMsg);
- } else {
- this.processMessage(message);
- }
+ logger.info("received dmaap message: " + message);
+ if (this.isFailure(message)) {
+ this.stopProcessingMessages();
+ final String errorMsg = "failure received from dmaap topic " + this.getTopic();
+ logger.error(errorMsg);
+ throw new DMaaPConsumerFailure(errorMsg);
+ } else {
+ this.processMessage(message);
}
}
}