revert sonar fix 08/85508/1
authorsunil.unnava <sunil.unnava@att.com>
Tue, 16 Apr 2019 18:13:39 +0000 (14:13 -0400)
committersunil.unnava <sunil.unnava@att.com>
Tue, 16 Apr 2019 18:14:02 +0000 (14:14 -0400)
Issue-ID: DMAAP-1169
Change-Id: Id81fc8e8acc485184fbe09b6b33aa050ba1daf21
Signed-off-by: sunil.unnava <sunil.unnava@att.com>
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java
src/main/java/org/onap/dmaap/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java

index 4e54a3c..67a8af9 100644 (file)
@@ -113,13 +113,13 @@ public class Kafka011Consumer implements Consumer {
        public synchronized Consumer.Message nextMessage() {
 
                try {
-                       if (fPendingMsgs.isEmpty()) {
+                       if (!fPendingMsgs.isEmpty()) {
                                return makeMessage(fPendingMsgs.take());
                        }
                } catch (InterruptedException x) {
                        log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
                                        x);
-                       //Thread.currentThread().interrupt();
+                       Thread.currentThread().interrupt();
                }
 
                Callable<Boolean> run = new Callable<Boolean>() {
index 00b6a75..647f4ce 100644 (file)
@@ -299,7 +299,7 @@ public class KafkaConsumerCache {
                                curator.blockUntilConnected();
                        } catch (InterruptedException e) {
                                log.error("error while setting curator framework :",e);
-                               //Thread.currentThread().interrupt();
+                               Thread.currentThread().interrupt();
                        }
                }
 
@@ -512,7 +512,7 @@ public class KafkaConsumerCache {
                        Thread.sleep(consumerHandoverWaitMs);
                } catch (InterruptedException e) {
                        log.error("InterruptedException in dropTimedOutConsumer",e);
-                       //Thread.currentThread().interrupt();
+                       Thread.currentThread().interrupt();
                }
                log.info("Dropped " + key + " consumer due to timeout");
        }
index 01c46e7..f645c8d 100644 (file)
@@ -154,7 +154,7 @@ public class DMaaPCambriaLimiter {
                                }
                        } catch (InterruptedException e) {
                                log.error("Exception "+ e);
-                               //Thread.currentThread().interrupt();
+                               Thread.currentThread().interrupt();
                        }
                        
                        
index 9c1fcb2..f59f704 100644 (file)
@@ -198,7 +198,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
                        }
                } catch (InterruptedException e) {
                        getLog().warn("Possible message loss. " + e.getMessage(), e);
-                       //Thread.currentThread().interrupt();
+                       Thread.currentThread().interrupt();
                } catch (IOException e) {
                        getLog().warn("Possible message loss. " + e.getMessage(), e);
                }
index 27ffc89..537fc22 100644 (file)
@@ -333,7 +333,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
                                        // FIXME: would be good to wait/signal
                                        Thread.sleep(100);
                                } catch (InterruptedException e) {
-                                 //  Thread.currentThread().interrupt();
+                                   Thread.currentThread().interrupt();
                                }
                        }
                });