import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.persistence.EntityManager;
import javax.persistence.Persistence;
import javax.persistence.Query;
import javax.validation.constraints.NotNull;
-
import org.onap.policy.common.im.jmx.ComponentAdmin;
import org.onap.policy.common.im.jmx.ComponentAdminMBean;
import org.onap.policy.common.im.jmx.JmxAgentConnection;
*/
protected IntegrityMonitor(String resourceName, Properties properties) throws IntegrityMonitorException {
- this(resourceName, properties, null);
+ this(resourceName, properties, new Factory());
}
/**
*
* @param resourceName The resource name of the resource
* @param properties a set of properties passed in from the resource
- * @param queue queue to use to control the FPManager thread, or {@code null}
+ * @param factory Factory to use to control the FPManager thread
* @throws IntegrityMonitorException if any errors are encountered in the constructor
*/
- protected IntegrityMonitor(String resourceName, Properties properties, BlockingQueue<CountDownLatch> queue)
+ protected IntegrityMonitor(String resourceName, Properties properties, Factory factory)
throws IntegrityMonitorException {
// singleton check since this constructor can be called from a child or
logger.error("ComponentAdmin constructor exception: {}", e.toString(), e);
}
- fpManager = new FpManager(queue);
+ fpManager = new FpManager(factory);
+ fpManager.start();
}
*/
public static IntegrityMonitor getInstance(String resourceName, Properties properties)
throws IntegrityMonitorException {
- return getInstance(resourceName, properties, null);
+ return getInstance(resourceName, properties, new Factory());
}
/**
*
* @param resourceName The resource name of the resource
* @param properties a set of properties passed in from the resource
- * @param queue queue to use to control the FPManager thread, or {@code null}
+ * @param factory Factory to use to control the FPManager thread
* @return The new instance of IntegrityMonitor
* @throws IntegrityMonitorException if unable to create jmx url or the constructor returns an
* exception
*/
protected static IntegrityMonitor getInstance(String resourceName, Properties properties,
- BlockingQueue<CountDownLatch> queue) throws IntegrityMonitorException {
+ Factory factory) throws IntegrityMonitorException {
synchronized (getInstanceLock) {
logger.debug("getInstance() called - resourceName= {}", resourceName);
if (instance == null) {
logger.debug("Creating new instance of IntegrityMonitor");
- instance = new IntegrityMonitor(resourceName, properties, queue);
+ instance = new IntegrityMonitor(resourceName, properties, factory);
}
return instance;
}
* dependencies, does a refresh state audit and runs the stateAudit.
*/
class FpManager extends Thread {
- private final CountDownLatch stopper = new CountDownLatch(1);
+ private boolean stopRequested = false;
- private BlockingQueue<CountDownLatch> queue;
- private CountDownLatch progressLatch = null;
+ private final Factory factory;
// Constructor - start FP manager thread
- FpManager(BlockingQueue<CountDownLatch> queue) {
- this.queue = queue;
+ FpManager(Factory factory) {
+ this.factory = factory;
// set now as the last time the refreshStateAudit ran
IntegrityMonitor.this.refreshStateAuditLastRunDate = new Date();
- // start thread
- this.start();
}
@Override
logger.debug("FPManager thread running");
try {
- getLatch();
- decrementLatch();
+ factory.runStarted();
- while (!stopper.await(cycleIntervalMillis, TimeUnit.MILLISECONDS)) {
- getLatch();
+ while(!stopRequested) {
+ factory.doSleep(cycleIntervalMillis);
+
IntegrityMonitor.this.runOnce();
- decrementLatch();
+ factory.monitorCompleted();
}
} catch (InterruptedException e) {
}
public void stopAndExit() {
- stopper.countDown();
+ stopRequested = true;
this.interrupt();
}
-
- /**
- * Gets the next latch from the queue.
- *
- * @throws InterruptedException
- *
- */
- private void getLatch() throws InterruptedException {
- if (queue != null) {
- progressLatch = queue.take();
- }
- }
-
- /**
- * Decrements the current latch.
- */
- private void decrementLatch() {
- if (progressLatch != null) {
- progressLatch.countDown();
- }
- }
-
}
private void runOnce() {
return allNotWellMap;
}
+ /**
+ * Used to access various objects. Overridden by junit tests.
+ */
+ public static class Factory {
+
+ /**
+ * Indicates that the {@link FpManager#run()} method has started. This method
+ * simply returns.
+ *
+ * @throws InterruptedException
+ */
+ public void runStarted() throws InterruptedException {
+ // does nothing
+ }
+
+ /**
+ * Sleeps for a period of time.
+ * @param sleepMs amount of time to sleep, in milliseconds
+ * @throws InterruptedException
+ */
+ public void doSleep(long sleepMs) throws InterruptedException {
+ Thread.sleep(sleepMs);
+ }
+
+ /**
+ * Indicates that a monitor activity has completed. This method simply returns.
+ *
+ * @throws InterruptedException
+ */
+ public void monitorCompleted() throws InterruptedException {
+ // does nothing
+ }
+ }
+
/*
* The remaining methods are used by JUnit tests.
*/
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-
import java.util.Date;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import java.util.concurrent.Semaphore;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import javax.persistence.TemporalType;
-
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.onap.policy.common.im.IntegrityMonitor.Factory;
import org.onap.policy.common.im.jpa.ForwardProgressEntity;
import org.onap.policy.common.im.jpa.ResourceRegistrationEntity;
import org.onap.policy.common.im.jpa.StateManagementEntity;
private static EntityTransaction et;
private static String resourceName;
- private BlockingQueue<CountDownLatch> queue;
+ private Semaphore monitorSem;
+ private Semaphore junitSem;
/**
* Set up for test class.
private IntegrityMonitor makeMonitor(String resourceName, Properties myProp) throws Exception {
IntegrityMonitor.deleteInstance();
- queue = new LinkedBlockingQueue<>();
+ monitorSem = new Semaphore(0);
+ junitSem = new Semaphore(0);
+
+ Factory factory = new IntegrityMonitor.Factory() {
+
+ @Override
+ public void doSleep(long sleepMs) throws InterruptedException {
+ /*
+ * No need to sleep, as the thread won't progress until the
+ * semaphore is released.
+ */
+ }
+
+ @Override
+ public void runStarted() throws InterruptedException {
+ monitorSem.acquire();
+
+ junitSem.release();
+ monitorSem.acquire();
+ }
+
+ @Override
+ public void monitorCompleted() throws InterruptedException {
+ junitSem.release();
+ monitorSem.acquire();
+ }
+
+ };
- IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, queue);
+ IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, factory);
// wait for the monitor thread to start
waitStep();
* @throws InterruptedException if the thread is interrupted
*/
private void waitStep() throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(1);
- queue.offer(latch);
- waitLatch(latch);
+ monitorSem.release();
+ waitSem(junitSem);
}
}
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
-
import org.onap.policy.common.utils.jpa.EntityTransCloser;
import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
import org.slf4j.Logger;
}
/**
- * Waits for a latch to reach zero.
+ * Waits for a semaphore to be acquired
*
- * @param latch the latch
+ * @param sem the latch
* @throws InterruptedException if the thread is interrupted
* @throws AssertionError if the latch did not reach zero in the allotted time
*/
- protected void waitLatch(CountDownLatch latch) throws InterruptedException {
- assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS));
+ protected void waitSem(Semaphore sem) throws InterruptedException {
+ assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.MILLISECONDS));
}
/**