Invoke lock callback in session thread 30/98430/2
authorJim Hahn <jrh3@att.com>
Thu, 14 Nov 2019 20:16:13 +0000 (15:16 -0500)
committerJim Hahn <jrh3@att.com>
Thu, 14 Nov 2019 21:42:52 +0000 (16:42 -0500)
Injects the callback as a DroolsRunnable into the session, if
there is one.  Otherwise, it invokes it via the engine's
thread pool.

Issue-ID: POLICY-2246
Signed-off-by: Jim Hahn <jrh3@att.com>
Change-Id: I214480ae675d89e7335dde4eb4abe2684f7ef8ab
Signed-off-by: Jim Hahn <jrh3@att.com>
feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockManager.java
feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/DistributedLockManagerTest.java
policy-management/src/main/java/org/onap/policy/drools/system/internal/FeatureLockImpl.java
policy-management/src/main/java/org/onap/policy/drools/system/internal/LockManager.java
policy-management/src/main/java/org/onap/policy/drools/system/internal/SimpleLockManager.java
policy-management/src/test/java/org/onap/policy/drools/system/internal/FeatureLockImplTest.java
policy-management/src/test/java/org/onap/policy/drools/system/internal/LockManagerTest.java
policy-management/src/test/java/org/onap/policy/drools/system/internal/SimpleLockManagerTest.java

index 7d58b8d..528fa7c 100644 (file)
@@ -341,7 +341,7 @@ public class DistributedLockManager extends LockManager<DistributedLockManager.D
             DistributedLock lock = lockref.get();
             if (lock != null) {
                 logger.debug("removed lock from map {}", lock);
-                lock.deny(DistributedLock.LOCK_LOST_MSG, false);
+                lock.deny(DistributedLock.LOCK_LOST_MSG);
             }
         }
     }
@@ -473,7 +473,7 @@ public class DistributedLockManager extends LockManager<DistributedLockManager.D
                 scheduleRequest(this::doExtend);
 
             } else {
-                deny(NOT_LOCKED_MSG, true);
+                deny(NOT_LOCKED_MSG);
             }
         }
 
@@ -639,7 +639,7 @@ public class DistributedLockManager extends LockManager<DistributedLockManager.D
                 }
 
                 if (success) {
-                    grant(true);
+                    grant();
                     return;
                 }
             }
@@ -690,7 +690,7 @@ public class DistributedLockManager extends LockManager<DistributedLockManager.D
                  * the record, thus we have to try to insert, if the update fails
                  */
                 if (doDbUpdate(conn) || doDbInsert(conn)) {
-                    grant(true);
+                    grant();
                     return;
                 }
             }
@@ -790,7 +790,7 @@ public class DistributedLockManager extends LockManager<DistributedLockManager.D
 
             synchronized (this) {
                 if (!isUnavailable()) {
-                    deny(LOCK_LOST_MSG, true);
+                    deny(LOCK_LOST_MSG);
                 }
             }
         }
index c996d8d..5351f00 100644 (file)
@@ -68,11 +68,13 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.kie.api.runtime.KieSession;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.onap.policy.common.utils.services.OrderedServiceImpl;
 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
+import org.onap.policy.drools.core.PolicySession;
 import org.onap.policy.drools.core.lock.Lock;
 import org.onap.policy.drools.core.lock.LockCallback;
 import org.onap.policy.drools.core.lock.LockState;
@@ -119,6 +121,9 @@ public class DistributedLockManagerTest {
     @Mock
     private PolicyEngine engine;
 
+    @Mock
+    private KieSession kieSess;
+
     @Mock
     private ScheduledExecutorService exsvc;
 
@@ -132,6 +137,7 @@ public class DistributedLockManagerTest {
     private BasicDataSource datasrc;
 
     private DistributedLock lock;
+    private PolicySession session;
 
     private AtomicInteger nactive;
     private AtomicInteger nsuccesses;
@@ -180,6 +186,16 @@ public class DistributedLockManagerTest {
     public void setUp() throws SQLException {
         MockitoAnnotations.initMocks(this);
 
+        // grant() and deny() calls will come through here and be immediately executed
+        session = new PolicySession(null, null, kieSess) {
+            @Override
+            public void insertDrools(Object object) {
+                ((Runnable) object).run();
+            }
+        };
+
+        session.setPolicySession();
+
         nactive = new AtomicInteger(0);
         nsuccesses = new AtomicInteger(0);
 
@@ -443,9 +459,9 @@ public class DistributedLockManagerTest {
         assertTrue(lock5.isUnavailable());
 
         // allow callbacks
-        runLock(5, 2);
-        runLock(6, 1);
-        runLock(7, 0);
+        runLock(2, 2);
+        runLock(3, 1);
+        runLock(4, 0);
         verify(callback).lockUnavailable(lock);
         verify(callback3).lockUnavailable(lock3);
         verify(callback5).lockUnavailable(lock5);
@@ -565,8 +581,8 @@ public class DistributedLockManagerTest {
         assertTrue(lock3.isWaiting());
         assertTrue(lock4.isUnavailable());
 
-        runLock(5, 0);
-        verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any());
+        runLock(4, 0);
+        verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
 
         verify(callback).lockUnavailable(lock);
         verify(callback2, never()).lockUnavailable(lock2);
index d4e4f5f..5690b18 100644 (file)
@@ -21,6 +21,8 @@
 package org.onap.policy.drools.system.internal;
 
 import java.util.concurrent.ScheduledExecutorService;
+import org.onap.policy.drools.core.DroolsRunnable;
+import org.onap.policy.drools.core.PolicySession;
 import org.onap.policy.drools.core.lock.LockCallback;
 import org.onap.policy.drools.core.lock.LockImpl;
 import org.onap.policy.drools.core.lock.LockState;
@@ -66,13 +68,9 @@ public abstract class FeatureLockImpl extends LockImpl {
     }
 
     /**
-     * Grants this lock. The notification is <i>always</i> invoked via a background
-     * thread.
-     *
-     * @param foreground {@code true} if to invoke the callback in the foreground thread,
-     *        {@code false} otherwise
+     * Grants this lock.
      */
-    protected synchronized void grant(boolean foreground) {
+    protected synchronized void grant() {
         if (isUnavailable()) {
             return;
         }
@@ -81,32 +79,37 @@ public abstract class FeatureLockImpl extends LockImpl {
         updateGrant();
 
         logger.info("lock granted: {}", this);
-
-        if (foreground) {
-            notifyAvailable();
-        } else {
-            getThreadPool().execute(this::notifyAvailable);
-        }
+        doNotify(this::notifyAvailable);
     }
 
     /**
      * Permanently denies this lock.
      *
      * @param reason the reason the lock was denied
-     * @param foreground {@code true} if to invoke the callback in the foreground thread,
-     *        {@code false} otherwise
      */
-    public void deny(String reason, boolean foreground) {
+    public void deny(String reason) {
         synchronized (this) {
             setState(LockState.UNAVAILABLE);
         }
 
         logger.info("{}: {}", reason, this);
+        doNotify(this::notifyUnavailable);
+    }
+
+    /**
+     * Notifies the session of a change in the lock state. If a session is attached, then
+     * it simply injects the notifier into the session. Otherwise, it executes it via a
+     * background thread.
+     *
+     * @param notifier function to invoke the callback
+     */
+    private void doNotify(DroolsRunnable notifier) {
+        PolicySession sess = getSession();
+        if (sess != null) {
+            sess.insertDrools(notifier);
 
-        if (foreground) {
-            notifyUnavailable();
         } else {
-            getThreadPool().execute(this::notifyUnavailable);
+            getThreadPool().execute(notifier);
         }
     }
 
@@ -164,7 +167,7 @@ public abstract class FeatureLockImpl extends LockImpl {
 
         // do a quick check of the state
         if (isUnavailable() || !attachFeature()) {
-            deny(LOCK_LOST_MSG, true);
+            deny(LOCK_LOST_MSG);
             return false;
         }
 
@@ -200,12 +203,13 @@ public abstract class FeatureLockImpl extends LockImpl {
      */
     protected abstract boolean addToFeature();
 
-    /**
-     * Gets the thread pool.
-     *
-     * @return the thread pool
-     */
+    // these may be overridden by junit tests
+
     protected ScheduledExecutorService getThreadPool() {
         return PolicyEngineConstants.getManager().getExecutorService();
     }
+
+    protected PolicySession getSession() {
+        return PolicySession.getCurrentSession();
+    }
 }
index 7e4505b..ef6b48d 100644 (file)
@@ -156,7 +156,7 @@ public abstract class LockManager<T extends FeatureLockImpl> implements PolicyRe
             logger.debug("added lock to map {}", lock);
             finishLock(lock);
         } else {
-            lock.deny("resource is busy", true);
+            lock.deny("resource is busy");
         }
 
         return lock;
index 839c17d..a62d766 100644 (file)
@@ -158,14 +158,14 @@ public class SimpleLockManager extends LockManager<SimpleLockManager.SimpleLock>
 
             SimpleLock lock = lockref.get();
             if (lock != null) {
-                lock.deny("lock expired", false);
+                lock.deny("lock expired");
             }
         }
     }
 
     @Override
     protected void finishLock(SimpleLock lock) {
-        lock.grant(true);
+        lock.grant();
     }
 
     @Override
@@ -257,9 +257,9 @@ public class SimpleLockManager extends LockManager<SimpleLockManager.SimpleLock>
             }
 
             if (resource2lock.get(getResourceId()) == this) {
-                grant(true);
+                grant();
             } else {
-                deny(NOT_LOCKED_MSG, true);
+                deny(NOT_LOCKED_MSG);
             }
         }
 
index 258ee0c..a224a63 100644 (file)
@@ -46,6 +46,8 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.onap.policy.drools.core.DroolsRunnable;
+import org.onap.policy.drools.core.PolicySession;
 import org.onap.policy.drools.core.lock.LockCallback;
 import org.onap.policy.drools.core.lock.LockState;
 import org.onap.policy.drools.system.PolicyEngineConstants;
@@ -126,30 +128,10 @@ public class FeatureLockImplTest {
         assertEquals(HOLD_SEC, lock.getHoldSec());
     }
 
-    /**
-     * Tests grant(), when using the foreground thread.
-     */
-    @Test
-    public void testGrantForeground() {
-        MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback);
-        lock.grant(true);
-
-        assertTrue(lock.isActive());
-        assertEquals(1, lock.nupdates);
-
-        verify(exsvc, never()).execute(any());
-
-        verify(callback).lockAvailable(any());
-        verify(callback, never()).lockUnavailable(any());
-    }
-
-    /**
-     * Tests grant(), when using the background thread.
-     */
     @Test
-    public void testGrantBackground() {
+    public void testGrant() {
         MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback);
-        lock.grant(false);
+        lock.grant();
 
         assertTrue(lock.isActive());
         assertEquals(1, lock.nupdates);
@@ -166,7 +148,7 @@ public class FeatureLockImplTest {
     public void testGrantUnavailable() {
         MyLock lock = new MyLock(LockState.UNAVAILABLE, RESOURCE, OWNER_KEY, HOLD_SEC, callback);
         lock.setState(LockState.UNAVAILABLE);
-        lock.grant(true);
+        lock.grant();
 
         assertTrue(lock.isUnavailable());
         assertEquals(0, lock.nupdates);
@@ -174,35 +156,65 @@ public class FeatureLockImplTest {
         verify(exsvc, never()).execute(any());
     }
 
-    /**
-     * Tests deny(), when using the foreground thread.
-     */
     @Test
-    public void testDenyForeground() {
+    public void testDeny() {
         MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback);
-        lock.deny("my reason", true);
+        lock.deny("my reason");
 
         assertTrue(lock.isUnavailable());
 
-        verify(exsvc, never()).execute(any());
-
+        invokeCallback(1);
         verify(callback, never()).lockAvailable(any());
         verify(callback).lockUnavailable(any());
     }
 
     /**
-     * Tests deny(), when using the background thread.
+     * Tests doNotify() when a session exists.
      */
     @Test
-    public void testDenyBackground() {
+    public void testDoNotifySession() {
+        PolicySession session = mock(PolicySession.class);
+
+        MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback) {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected PolicySession getSession() {
+                return session;
+            }
+        };
+
+        lock.grant();
+
+        assertTrue(lock.isActive());
+        assertEquals(1, lock.nupdates);
+
+        verify(exsvc, never()).execute(any());
+
+        ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
+        verify(session).insertDrools(captor.capture());
+
+        DroolsRunnable runner = (DroolsRunnable) captor.getValue();
+        runner.run();
+
+        verify(callback).lockAvailable(any());
+        verify(callback, never()).lockUnavailable(any());
+    }
+
+    /**
+     * Tests doNotify() when there is no session.
+     */
+    @Test
+    public void testDoNotifyNoSession() {
         MyLock lock = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback);
-        lock.deny("my reason", false);
+        lock.grant();
 
-        assertTrue(lock.isUnavailable());
+        assertTrue(lock.isActive());
+        assertEquals(1, lock.nupdates);
 
         invokeCallback(1);
-        verify(callback, never()).lockAvailable(any());
-        verify(callback).lockUnavailable(any());
+        verify(callback).lockAvailable(any());
+        verify(callback, never()).lockUnavailable(any());
     }
 
     @Test
@@ -283,6 +295,7 @@ public class FeatureLockImplTest {
         assertEquals(HOLD_SEC2, lock.getHoldSec());
         assertSame(scallback, lock.getCallback());
 
+        invokeCallback(1);
         verify(scallback, never()).lockAvailable(lock);
         verify(scallback).lockUnavailable(lock);
     }
@@ -324,10 +337,19 @@ public class FeatureLockImplTest {
         assertEquals(HOLD_SEC2, lock.getHoldSec());
         assertSame(scallback, lock.getCallback());
 
+        invokeCallback(1);
         verify(scallback, never()).lockAvailable(lock);
         verify(scallback).lockUnavailable(lock);
     }
 
+    @Test
+    public void testGetSession() {
+        MyLockStdSession lock = new MyLockStdSession(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback);
+
+        // this should invoke the real policy session without throwing an exception
+        lock.grant();
+    }
+
     @Test
     public void testToString() {
         String text = new MyLock(LockState.WAITING, RESOURCE, OWNER_KEY, HOLD_SEC, callback).toString();
@@ -361,15 +383,19 @@ public class FeatureLockImplTest {
         }
     }
 
-    public static class MyLock extends FeatureLockImpl {
+    /**
+     * Lock that inherits the normal getSession() method.
+     */
+    public static class MyLockStdSession extends FeatureLockImpl {
         private static final long serialVersionUID = 1L;
-        private int nupdates = 0;
+        protected int nupdates = 0;
 
-        public MyLock() {
+        public MyLockStdSession() {
             super();
         }
 
-        public MyLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback) {
+        public MyLockStdSession(LockState state, String resourceId, String ownerKey, int holdSec,
+                        LockCallback callback) {
             super(state, resourceId, ownerKey, holdSec, callback);
         }
 
@@ -395,6 +421,23 @@ public class FeatureLockImplTest {
         }
     }
 
+    public static class MyLock extends MyLockStdSession {
+        private static final long serialVersionUID = 1L;
+
+        public MyLock() {
+            super();
+        }
+
+        public MyLock(LockState state, String resourceId, String ownerKey, int holdSec, LockCallback callback) {
+            super(state, resourceId, ownerKey, holdSec, callback);
+        }
+
+        @Override
+        protected PolicySession getSession() {
+            return null;
+        }
+    }
+
     public static class MyLockNoFeature extends MyLock {
         private static final long serialVersionUID = 1L;
 
index 1cda079..6b6e736 100644 (file)
@@ -208,7 +208,7 @@ public class LockManagerTest {
 
         @Override
         protected void finishLock(MyLock lock) {
-            lock.grant(true);
+            lock.grant();
         }
 
         @Override
index 79e2067..b1c34c2 100644 (file)
@@ -55,11 +55,13 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.kie.api.runtime.KieSession;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.onap.policy.common.utils.time.CurrentTime;
 import org.onap.policy.common.utils.time.TestTime;
+import org.onap.policy.drools.core.PolicySession;
 import org.onap.policy.drools.core.lock.Lock;
 import org.onap.policy.drools.core.lock.LockCallback;
 import org.onap.policy.drools.core.lock.LockState;
@@ -85,11 +87,15 @@ public class SimpleLockManagerTest {
     private static ScheduledExecutorService saveExec;
     private static ScheduledExecutorService realExec;
 
+    private PolicySession session;
     private TestTime testTime;
     private AtomicInteger nactive;
     private AtomicInteger nsuccesses;
     private SimpleLockManager feature;
 
+    @Mock
+    private KieSession kieSess;
+
     @Mock
     private ScheduledExecutorService exsvc;
 
@@ -130,6 +136,16 @@ public class SimpleLockManagerTest {
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
+        // grant() and deny() calls will come through here and be immediately executed
+        session = new PolicySession(null, null, kieSess) {
+            @Override
+            public void insertDrools(Object object) {
+                ((Runnable) object).run();
+            }
+        };
+
+        session.setPolicySession();
+
         testTime = new TestTime();
         nactive = new AtomicInteger(0);
         nsuccesses = new AtomicInteger(0);
@@ -252,10 +268,6 @@ public class SimpleLockManagerTest {
         assertFalse(lock2.isActive());
         assertTrue(lock3.isActive());
 
-        // run the callbacks
-        captor = ArgumentCaptor.forClass(Runnable.class);
-        verify(exsvc, times(2)).execute(captor.capture());
-        captor.getAllValues().forEach(Runnable::run);
         verify(callback).lockUnavailable(lock);
         verify(callback).lockUnavailable(lock2);
         verify(callback, never()).lockUnavailable(lock3);
@@ -272,10 +284,6 @@ public class SimpleLockManagerTest {
         checker.run();
         assertFalse(lock3.isActive());
 
-        // run the callback
-        captor = ArgumentCaptor.forClass(Runnable.class);
-        verify(exsvc, times(3)).execute(captor.capture());
-        captor.getValue().run();
         verify(callback).lockUnavailable(lock3);
     }
 
@@ -433,7 +441,7 @@ public class SimpleLockManagerTest {
     @Test
     public void testSimpleLockExpired() {
         SimpleLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
-        lock.grant(true);
+        lock.grant();
 
         assertFalse(lock.expired(testTime.getMillis()));
         assertFalse(lock.expired(testTime.getMillis() + HOLD_MS - 1));