IntegrityAudit - remove latches from non-test code 51/55151/1
authorJim Hahn <jrh3@att.com>
Wed, 13 Jun 2018 16:23:26 +0000 (12:23 -0400)
committerJim Hahn <jrh3@att.com>
Wed, 20 Jun 2018 19:19:50 +0000 (15:19 -0400)
Change-Id: I5d5cc7d581f78d5551e2fe7447720403bb63ada2
Issue-ID: POLICY-908
Signed-off-by: Jim Hahn <jrh3@att.com>
integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java
integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java

index f1839b1..58ed8b9 100644 (file)
@@ -25,10 +25,7 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import org.onap.policy.common.ia.jpa.IntegrityAuditEntity;
 import org.onap.policy.common.logging.eelf.MessageCodes;
 import org.onap.policy.common.logging.flexlogger.FlexLogger;
@@ -107,18 +104,6 @@ public class AuditThread extends Thread {
      */
     private IntegrityAudit integrityAudit;
 
-    /**
-     * A latch is taken from this queue before starting an audit. May be {@code null}. Used by JUnit
-     * tests.
-     */
-    private BlockingQueue<CountDownLatch> auditLatchQueue;
-
-    /**
-     * Latch to be decremented when the next audit completes. May be {@code null}. Used by JUnit
-     * tests to wait for an audit to complete.
-     */
-    private CountDownLatch auditCompletionLatch = null;
-
     /**
      * AuditThread constructor.
      * 
@@ -133,7 +118,7 @@ public class AuditThread extends Thread {
             int integrityAuditPeriodSeconds, IntegrityAudit integrityAudit) throws IntegrityAuditException {
 
         this(resourceName, persistenceUnit, properties, TimeUnit.SECONDS.toMillis(integrityAuditPeriodSeconds),
-                integrityAudit, null);
+                integrityAudit);
     }
 
     /**
@@ -148,13 +133,12 @@ public class AuditThread extends Thread {
      * @throws IntegrityAuditException if an error occurs
      */
     public AuditThread(String resourceName, String persistenceUnit, Properties properties, long integrityAuditMillis,
-            IntegrityAudit integrityAudit, BlockingQueue<CountDownLatch> queue) throws IntegrityAuditException {
+            IntegrityAudit integrityAudit) throws IntegrityAuditException {
         this.resourceName = resourceName;
         this.persistenceUnit = persistenceUnit;
         this.properties = properties;
         this.integrityAuditPeriodMillis = integrityAuditMillis;
         this.integrityAudit = integrityAudit;
-        this.auditLatchQueue = queue;
 
         /*
          * The DbDAO Constructor registers this node in the IntegrityAuditEntity table. Each
@@ -174,14 +158,8 @@ public class AuditThread extends Thread {
         logger.info("AuditThread.run: Entering");
 
         try {
-            /*
-             * For JUnit testing: wait for the first latch, decrement it to indicate that the thread
-             * has started, and then wait for the next latch, before we actually start doing
-             * anything. These simply return if there is no latch queue defined.
-             */
-            getNextLatch();
-            decrementLatch();
-            getNextLatch();
+            // for junit testing
+            runStarted();
 
             /*
              * Triggers change in designation, unless no other viable candidate.
@@ -284,11 +262,8 @@ public class AuditThread extends Thread {
                      * property, otherwise just sleep the normal interval.
                      */
                     if (auditCompleted) {
-                        // indicate that an audit has completed
-                        decrementLatch();
-
-                        // don't start the next audit cycle until a latch has been provided
-                        getNextLatch();
+                        // for junit testing: indicate that an audit has completed
+                        auditCompleted();
 
                         if (logger.isDebugEnabled()) {
                             logger.debug("AuditThread.run: Audit completed; resourceName=" + this.resourceName
@@ -341,29 +316,6 @@ public class AuditThread extends Thread {
         logger.info("AuditThread.run: Exiting");
     }
 
-    /**
-     * Gets the next audit-completion latch from the queue. Blocks, if the queue is empty.
-     * 
-     * @throws InterruptedException if interrupted while waiting
-     */
-    private void getNextLatch() throws InterruptedException {
-        BlockingQueue<CountDownLatch> queue = this.auditLatchQueue;
-        if (queue != null) {
-            this.auditCompletionLatch = queue.take();
-        }
-    }
-
-    /**
-     * Decrements the current audit-completion latch, if any.
-     */
-    private void decrementLatch() {
-        CountDownLatch latch = this.auditCompletionLatch;
-        if (latch != null) {
-            this.auditCompletionLatch = null;
-            latch.countDown();
-        }
-    }
-
     /**
      * Determines if an exception is an InterruptedException or was caused by an
      * InterruptedException.
@@ -787,6 +739,26 @@ public class AuditThread extends Thread {
 
     }
 
+    /**
+     * Indicates that the {@link #run()} method has started. This method simply returns,
+     * and may overridden by junit tests.
+     * 
+     * @throws InterruptedException
+     */
+    public void runStarted() throws InterruptedException {
+        // does nothing
+    }
+
+    /**
+     * Indicates that an audit has completed. This method simply returns, and may
+     * overridden by junit tests.
+     * 
+     * @throws InterruptedException
+     */
+    public void auditCompleted() throws InterruptedException {
+        // does nothing
+    }
+
     /**
      * Adjusts the thread-sleep-interval to be used when an audit has <i>not</i> been completed.
      * Used by JUnit tests.
index 9abdbe5..b3330fa 100644 (file)
@@ -21,9 +21,6 @@
 package org.onap.policy.common.ia;
 
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-
 import org.onap.policy.common.ia.IntegrityAuditProperties.NodeTypeEnum;
 import org.onap.policy.common.logging.flexlogger.FlexLogger;
 import org.onap.policy.common.logging.flexlogger.Logger;
@@ -217,37 +214,20 @@ public class IntegrityAudit {
      * @throws IntegrityAuditException if an error occurs
      */
     public void startAuditThread() throws IntegrityAuditException {
-        startAuditThread(null);
-    }
-
-    /**
-     * Starts the audit thread.
-     * 
-     * @param queue the queue
-     * @return {@code true} if the thread was started, {@code false} otherwise
-     * @throws IntegrityAuditException if an error occurs
-     */
-    protected boolean startAuditThread(BlockingQueue<CountDownLatch> queue) throws IntegrityAuditException {
-
         logger.info("startAuditThread: Entering");
 
-        boolean success = false;
-
         if (integrityAuditPeriodMillis >= 0) {
-            this.auditThread = new AuditThread(this.resourceName, this.persistenceUnit, this.properties,
-                    integrityAuditPeriodMillis, this, queue);
+            this.auditThread = makeAuditThread(this.resourceName, this.persistenceUnit, this.properties, integrityAuditPeriodMillis);
             logger.info("startAuditThread: Audit started and will run every " + integrityAuditPeriodMillis / 1000
                     + " seconds");
             this.auditThread.start();
-            success = true;
+            
         } else {
             logger.info("startAuditThread: Suppressing integrity audit, integrityAuditPeriodSeconds="
                     + integrityAuditPeriodMillis / 1000);
         }
 
         logger.info("startAuditThread: Exiting");
-
-        return success;
     }
 
     /**
@@ -299,4 +279,29 @@ public class IntegrityAudit {
             return !this.auditThread.isAlive();
         }
     }
+
+    /**
+     * 
+     * @return {@code true} if an audit thread exists, {@code false} otherwise
+     */
+    protected boolean haveAuditThread() {
+        return (this.auditThread != null);
+    }
+
+    /**
+     * Creates an audit thread. May be overridden by junit tests.
+     * 
+     * @param resourceName2
+     * @param persistenceUnit2
+     * @param properties2
+     * @param integrityAuditPeriodMillis2
+     * 
+     * @return a new audit thread
+     * @throws IntegrityAuditException
+     */
+    protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2,
+                    long integrityAuditPeriodMillis2) throws IntegrityAuditException {
+        
+        return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodMillis2, this);
+    }
 }
index afbcc45..c917990 100644 (file)
@@ -22,10 +22,6 @@ package org.onap.policy.common.ia;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -34,21 +30,19 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.EntityTransaction;
 import javax.persistence.Persistence;
-
 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
 import org.onap.policy.common.utils.jpa.EntityMgrFactoryCloser;
 import org.onap.policy.common.utils.jpa.EntityTransCloser;
 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
 import org.slf4j.LoggerFactory;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
 
 /**
  * All JUnits are designed to run in the local development environment where they have write
@@ -464,26 +458,26 @@ public class IntegrityAuditTestBase {
     protected void runAudit(MyIntegrityAudit... auditors) throws InterruptedException {
 
         // start an audit cycle on each auditor
-        List<CountDownLatch> latches = new ArrayList<>(auditors.length);
+        List<Semaphore> semaphores = new ArrayList<>(auditors.length);
         for (MyIntegrityAudit p : auditors) {
-            latches.add(p.startAudit());
+            semaphores.add(p.startAudit());
         }
 
         // wait for each auditor to complete its cycle
-        for (CountDownLatch latch : latches) {
-            waitLatch(latch);
+        for (Semaphore sem : semaphores) {
+            waitSem(sem);
         }
     }
 
     /**
-     * Waits for a latch to reach zero.
+     * Waits for a semaphore to be released.
      * 
-     * @param latch the latch to wait for
+     * @param sem the semaphore for which to wait
      * @throws InterruptedException if the thread is interrupted
      * @throws AssertionError if the latch did not reach zero in the allotted time
      */
-    protected void waitLatch(CountDownLatch latch) throws InterruptedException {
-        assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS));
+    protected void waitSem(Semaphore sem) throws InterruptedException {
+        assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.SECONDS));
     }
 
     /**
@@ -520,11 +514,16 @@ public class IntegrityAuditTestBase {
      * Manages audits by inserting latches into a queue for the AuditThread to count.
      */
     protected class MyIntegrityAudit extends IntegrityAudit {
-
+        
+        /**
+         * Semaphore on which the audit thread should wait.
+         */
+        private Semaphore auditSem = null;
+        
         /**
-         * Queue from which the AuditThread will take latches.
+         * Semaphore on which the junit management thread should wait.
          */
-        private BlockingQueue<CountDownLatch> queue = null;
+        private Semaphore junitSem = null;
 
         /**
          * Constructs an auditor and starts the AuditThread.
@@ -550,16 +549,14 @@ public class IntegrityAuditTestBase {
         }
 
         /**
-         * Triggers an audit by adding a latch to the queue.
+         * Triggers an audit by releasing the audit thread's semaphore.
          * 
-         * @return the latch that was added
+         * @return the semaphore on which to wait
          * @throws InterruptedException if the thread is interrupted
          */
-        public CountDownLatch startAudit() throws InterruptedException {
-            CountDownLatch latch = new CountDownLatch(1);
-            queue.add(latch);
-
-            return latch;
+        public Semaphore startAudit() throws InterruptedException {
+            auditSem.release();
+            return junitSem;
         }
 
         /**
@@ -567,25 +564,23 @@ public class IntegrityAuditTestBase {
          */
         @Override
         public final void startAuditThread() throws IntegrityAuditException {
-            if (queue != null) {
-                // queue up a bogus latch, in case a thread is still running
-                queue.add(new CountDownLatch(1) {
-                    @Override
-                    public void countDown() {
-                        throw new RuntimeException("auditor has multiple threads");
-                    }
-                });
+            if (auditSem != null) {
+                // release a bunch of semaphores, in case a thread is still running
+                auditSem.release(1000);
             }
+            
+            auditSem = new Semaphore(0);
+            junitSem = new Semaphore(0);
+            
+            super.startAuditThread();
 
-            queue = new LinkedBlockingQueue<>();
+            if (haveAuditThread()) {
+                // tell the thread it can run
+                auditSem.release();
 
-            if (super.startAuditThread(queue)) {
                 // wait for the thread to start
-                CountDownLatch latch = new CountDownLatch(1);
-                queue.add(latch);
-
                 try {
-                    waitLatch(latch);
+                    waitSem(junitSem);
 
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
@@ -605,5 +600,31 @@ public class IntegrityAuditTestBase {
 
             assertTrue(waitThread(this));
         }
+
+        @Override
+        protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2,
+                        long integrityAuditPeriodMillis2) throws IntegrityAuditException {
+
+            return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodMillis2, this) {
+
+                private Semaphore auditSem = MyIntegrityAudit.this.auditSem;
+                private Semaphore junitSem = MyIntegrityAudit.this.junitSem;
+
+                @Override
+                public void runStarted() throws InterruptedException {
+                    auditSem.acquire();
+                    
+                    junitSem.release();
+                    auditSem.acquire();
+                }
+
+                @Override
+                public void auditCompleted() throws InterruptedException {
+                    junitSem.release();
+                    auditSem.acquire();
+                }
+                
+            };
+        }
     }
 }