IntegrityMonitory - remove latch for non-test code 47/55147/1
authorJim Hahn <jrh3@att.com>
Thu, 14 Jun 2018 13:05:50 +0000 (09:05 -0400)
committerJim Hahn <jrh3@att.com>
Wed, 20 Jun 2018 19:08:05 +0000 (15:08 -0400)
Change-Id: I95ebcf8aabfa01eb6453a4ba5dd88d11c8f025c9
Issue-ID: POLICY-908
Signed-off-by: Jim Hahn <jrh3@att.com>
integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java
integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java

index c32a221..38dc20d 100644 (file)
@@ -29,10 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import javax.management.JMX;
 import javax.management.MBeanServerConnection;
 import javax.persistence.EntityManager;
@@ -43,7 +40,6 @@ import javax.persistence.LockModeType;
 import javax.persistence.Persistence;
 import javax.persistence.Query;
 import javax.validation.constraints.NotNull;
-
 import org.onap.policy.common.im.jmx.ComponentAdmin;
 import org.onap.policy.common.im.jmx.ComponentAdminMBean;
 import org.onap.policy.common.im.jmx.JmxAgentConnection;
@@ -196,7 +192,7 @@ public class IntegrityMonitor {
      */
     protected IntegrityMonitor(String resourceName, Properties properties) throws IntegrityMonitorException {
 
-        this(resourceName, properties, null);
+        this(resourceName, properties, new Factory());
     }
 
     /**
@@ -207,10 +203,10 @@ public class IntegrityMonitor {
      * 
      * @param resourceName The resource name of the resource
      * @param properties a set of properties passed in from the resource
-     * @param queue queue to use to control the FPManager thread, or {@code null}
+     * @param factory Factory to use to control the FPManager thread
      * @throws IntegrityMonitorException if any errors are encountered in the constructor
      */
-    protected IntegrityMonitor(String resourceName, Properties properties, BlockingQueue<CountDownLatch> queue)
+    protected IntegrityMonitor(String resourceName, Properties properties, Factory factory)
             throws IntegrityMonitorException {
 
         // singleton check since this constructor can be called from a child or
@@ -357,7 +353,8 @@ public class IntegrityMonitor {
             logger.error("ComponentAdmin constructor exception: {}", e.toString(), e);
         }
 
-        fpManager = new FpManager(queue);
+        fpManager = new FpManager(factory);
+        fpManager.start();
 
     }
 
@@ -373,7 +370,7 @@ public class IntegrityMonitor {
      */
     public static IntegrityMonitor getInstance(String resourceName, Properties properties)
             throws IntegrityMonitorException {
-        return getInstance(resourceName, properties, null);
+        return getInstance(resourceName, properties, new Factory());
     }
 
     /**
@@ -382,13 +379,13 @@ public class IntegrityMonitor {
      * 
      * @param resourceName The resource name of the resource
      * @param properties a set of properties passed in from the resource
-     * @param queue queue to use to control the FPManager thread, or {@code null}
+     * @param factory Factory to use to control the FPManager thread
      * @return The new instance of IntegrityMonitor
      * @throws IntegrityMonitorException if unable to create jmx url or the constructor returns an
      *         exception
      */
     protected static IntegrityMonitor getInstance(String resourceName, Properties properties,
-            BlockingQueue<CountDownLatch> queue) throws IntegrityMonitorException {
+                    Factory factory) throws IntegrityMonitorException {
 
         synchronized (getInstanceLock) {
             logger.debug("getInstance() called - resourceName= {}", resourceName);
@@ -399,7 +396,7 @@ public class IntegrityMonitor {
 
             if (instance == null) {
                 logger.debug("Creating new instance of IntegrityMonitor");
-                instance = new IntegrityMonitor(resourceName, properties, queue);
+                instance = new IntegrityMonitor(resourceName, properties, factory);
             }
             return instance;
         }
@@ -1740,18 +1737,15 @@ public class IntegrityMonitor {
      * dependencies, does a refresh state audit and runs the stateAudit.
      */
     class FpManager extends Thread {
-        private final CountDownLatch stopper = new CountDownLatch(1);
+        private boolean stopRequested = false;
 
-        private BlockingQueue<CountDownLatch> queue;
-        private CountDownLatch progressLatch = null;
+        private final Factory factory;
 
         // Constructor - start FP manager thread
-        FpManager(BlockingQueue<CountDownLatch> queue) {
-            this.queue = queue;
+        FpManager(Factory factory) {
+            this.factory = factory;
             // set now as the last time the refreshStateAudit ran
             IntegrityMonitor.this.refreshStateAuditLastRunDate = new Date();
-            // start thread
-            this.start();
         }
 
         @Override
@@ -1759,13 +1753,13 @@ public class IntegrityMonitor {
             logger.debug("FPManager thread running");
 
             try {
-                getLatch();
-                decrementLatch();
+                factory.runStarted();
 
-                while (!stopper.await(cycleIntervalMillis, TimeUnit.MILLISECONDS)) {
-                    getLatch();
+                while(!stopRequested) {
+                    factory.doSleep(cycleIntervalMillis);
+                    
                     IntegrityMonitor.this.runOnce();
-                    decrementLatch();
+                    factory.monitorCompleted();
                 }
 
             } catch (InterruptedException e) {
@@ -1775,31 +1769,9 @@ public class IntegrityMonitor {
         }
 
         public void stopAndExit() {
-            stopper.countDown();
+            stopRequested = true;
             this.interrupt();
         }
-
-        /**
-         * Gets the next latch from the queue.
-         * 
-         * @throws InterruptedException
-         * 
-         */
-        private void getLatch() throws InterruptedException {
-            if (queue != null) {
-                progressLatch = queue.take();
-            }
-        }
-
-        /**
-         * Decrements the current latch.
-         */
-        private void decrementLatch() {
-            if (progressLatch != null) {
-                progressLatch.countDown();
-            }
-        }
-
     }
 
     private void runOnce() {
@@ -1934,6 +1906,40 @@ public class IntegrityMonitor {
         return allNotWellMap;
     }
 
+    /**
+     * Used to access various objects. Overridden by junit tests.
+     */
+    public static class Factory {
+
+        /**
+         * Indicates that the {@link FpManager#run()} method has started. This method
+         * simply returns.
+         * 
+         * @throws InterruptedException
+         */
+        public void runStarted() throws InterruptedException {
+            // does nothing
+        }
+
+        /**
+         * Sleeps for a period of time.
+         * @param sleepMs   amount of time to sleep, in milliseconds
+         * @throws InterruptedException 
+         */
+        public void doSleep(long sleepMs) throws InterruptedException {
+            Thread.sleep(sleepMs);
+        }
+
+        /**
+         * Indicates that a monitor activity has completed. This method simply returns.
+         * 
+         * @throws InterruptedException
+         */
+        public void monitorCompleted() throws InterruptedException {
+            // does nothing
+        }
+    }
+
     /*
      * The remaining methods are used by JUnit tests.
      */
index 7f1e551..091dcc9 100644 (file)
@@ -22,23 +22,19 @@ package org.onap.policy.common.im;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import java.util.concurrent.Semaphore;
 import javax.persistence.EntityTransaction;
 import javax.persistence.Query;
 import javax.persistence.TemporalType;
-
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.onap.policy.common.im.IntegrityMonitor.Factory;
 import org.onap.policy.common.im.jpa.ForwardProgressEntity;
 import org.onap.policy.common.im.jpa.ResourceRegistrationEntity;
 import org.onap.policy.common.im.jpa.StateManagementEntity;
@@ -57,7 +53,8 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase {
     private static EntityTransaction et;
     private static String resourceName;
 
-    private BlockingQueue<CountDownLatch> queue;
+    private Semaphore monitorSem;
+    private Semaphore junitSem;
 
     /**
      * Set up for test class.
@@ -900,9 +897,36 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase {
     private IntegrityMonitor makeMonitor(String resourceName, Properties myProp) throws Exception {
         IntegrityMonitor.deleteInstance();
 
-        queue = new LinkedBlockingQueue<>();
+        monitorSem = new Semaphore(0);
+        junitSem = new Semaphore(0);
+        
+        Factory factory = new IntegrityMonitor.Factory() {
+
+            @Override
+            public void doSleep(long sleepMs) throws InterruptedException {
+                /*
+                 * No need to sleep, as the thread won't progress until the
+                 * semaphore is released.
+                 */
+            }
+
+            @Override
+            public void runStarted() throws InterruptedException {
+                monitorSem.acquire();
+                
+                junitSem.release();
+                monitorSem.acquire();
+            }
+
+            @Override
+            public void monitorCompleted() throws InterruptedException {
+                junitSem.release();
+                monitorSem.acquire();
+            }
+            
+        };
 
-        IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, queue);
+        IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, factory);
 
         // wait for the monitor thread to start
         waitStep();
@@ -916,8 +940,7 @@ public class IntegrityMonitorTest extends IntegrityMonitorTestBase {
      * @throws InterruptedException if the thread is interrupted
      */
     private void waitStep() throws InterruptedException {
-        CountDownLatch latch = new CountDownLatch(1);
-        queue.offer(latch);
-        waitLatch(latch);
+        monitorSem.release();
+        waitSem(junitSem);
     }
 }
index 0c8259b..e556230 100644 (file)
@@ -22,17 +22,14 @@ package org.onap.policy.common.im;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Persistence;
-
 import org.onap.policy.common.utils.jpa.EntityTransCloser;
 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
 import org.slf4j.Logger;
@@ -243,14 +240,14 @@ public class IntegrityMonitorTestBase {
     }
 
     /**
-     * Waits for a latch to reach zero.
+     * Waits for a semaphore to be acquired
      * 
-     * @param latch the latch
+     * @param sem the latch
      * @throws InterruptedException if the thread is interrupted
      * @throws AssertionError if the latch did not reach zero in the allotted time
      */
-    protected void waitLatch(CountDownLatch latch) throws InterruptedException {
-        assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS));
+    protected void waitSem(Semaphore sem) throws InterruptedException {
+        assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.MILLISECONDS));
     }
 
     /**