Close old UEB/DMaaP consumer 41/43541/4
authorJim Hahn <jrh3@att.com>
Wed, 18 Apr 2018 15:12:54 +0000 (11:12 -0400)
committerJorge Hernandez <jh1730@att.com>
Thu, 19 Apr 2018 19:04:43 +0000 (19:04 +0000)
Modified code to close the old consumer when the filter is changed.
Made some changes to toString() methods to resolve some sonar
issues.
Modified so-as to not interfere with fetch().
Use synchronized instead of AtomicReferences.

Change-Id: I4c9d2cc32993208fd345e66ef1f1dce7a1e7de7d
Issue-ID: POLICY-750
Signed-off-by: Jim Hahn <jrh3@att.com>
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java

index 70c37d5..a299060 100644 (file)
@@ -27,7 +27,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
 import org.onap.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
 import org.onap.policy.drools.properties.PolicyProperties;
 import org.slf4j.Logger;
@@ -89,11 +88,21 @@ public interface BusConsumer {
      * Used to build the consumer.
      */
     private final ConsumerBuilder builder;
+    
+    /**
+     * Locked while updating {@link #consumer} and {@link #newConsumer}.
+     */
+    private final Object consLocker = new Object();
 
     /**
      * Cambria client
      */
-    protected volatile CambriaConsumer consumer;
+    private CambriaConsumer consumer;
+
+    /**
+     * Cambria client to use for next fetch
+     */
+    private CambriaConsumer newConsumer = null;
 
     /**
      * fetch timeout
@@ -168,7 +177,7 @@ public interface BusConsumer {
     @Override
     public Iterable<String> fetch() throws IOException, InterruptedException {
       try {
-        return this.consumer.fetch();
+        return getCurrentConsumer().fetch();
       } catch (final IOException e) {
         logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
             this.fetchTimeout);
@@ -186,7 +195,29 @@ public interface BusConsumer {
         closeCondition.notifyAll();
       }
 
-      this.consumer.close();
+      getCurrentConsumer().close();
+    }
+
+    private CambriaConsumer getCurrentConsumer() {
+        CambriaConsumer old = null;
+        CambriaConsumer ret;
+        
+        synchronized(consLocker) {
+            if(this.newConsumer != null) {
+                // replace old consumer with new consumer
+                old = this.consumer;
+                this.consumer = this.newConsumer;
+                this.newConsumer = null;
+            }
+            
+            ret = this.consumer;
+        }
+        
+        if(old != null) {
+            old.close();
+        }
+        
+        return ret;
     }
 
     @Override
@@ -195,18 +226,29 @@ public interface BusConsumer {
         builder.withServerSideFilter(filter);
 
         try {
-            consumer = builder.build();
+            CambriaConsumer previous;
+            synchronized(consLocker) {
+                previous = this.newConsumer;
+                this.newConsumer = builder.build();
+            }
+            
+            if(previous != null) {
+                // there was already a new consumer - close it
+                previous.close();
+            }
             
         } catch (MalformedURLException | GeneralSecurityException e) {
+            /*
+             * Since an exception occurred, "consumer" still has its old value,
+             * thus it should not be closed at this point.
+             */
             throw new IllegalArgumentException(e);
         }
     }
 
     @Override
     public String toString() {
-      final StringBuilder builder = new StringBuilder();
-      builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]");
-      return builder.toString();
+      return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
     }
   }
 
@@ -311,13 +353,10 @@ public interface BusConsumer {
 
     @Override
     public String toString() {
-      final StringBuilder builder = new StringBuilder();
-      builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=")
-          .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=")
-          .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost())
-          .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag())
-          .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]");
-      return builder.toString();
+            return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
+                            + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()="
+                            + consumer.getHost() + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag()
+                            + ", consumer.getUsername()=" + consumer.getUsername() + "]";
     }
   }
 
@@ -378,15 +417,12 @@ public interface BusConsumer {
 
     @Override
     public String toString() {
-      final StringBuilder builder = new StringBuilder();
       final MRConsumerImpl consumer = this.consumer;
 
-      builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=")
-          .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=")
-          .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost())
-          .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag())
-          .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]");
-      return builder.toString();
+            return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
+                            + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()="
+                            + consumer.getHost() + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag()
+                            + ", consumer.getUsername()=" + consumer.getUsername() + "]";
     }
   }