Make feature-pooling-dmaap work without filtering
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / PoolingManagerImplTest.java
index 2a0066b..21bd62d 100644 (file)
@@ -25,10 +25,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.contains;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -49,9 +47,7 @@ import org.mockito.ArgumentCaptor;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
 import org.onap.policy.drools.pooling.message.Heartbeat;
 import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
@@ -83,7 +79,6 @@ public class PoolingManagerImplTest {
     private static final String THE_EVENT = "the event";
 
     private static final Object DECODED_EVENT = new Object();
-    private static final String REQUEST_ID = "my.request.id";
 
     /**
      * Number of dmaap.publish() invocations that should be issued when the manager is
@@ -98,7 +93,6 @@ public class PoolingManagerImplTest {
 
     private PoolingProperties poolProps;
     private ListeningController controller;
-    private ClassExtractors extractors;
     private DmaapManager dmaap;
     private boolean gotDmaap;
     private ScheduledThreadPoolExecutor sched;
@@ -132,7 +126,6 @@ public class PoolingManagerImplTest {
         ser = new Serializer();
         active = new CountDownLatch(1);
 
-        extractors = mock(ClassExtractors.class);
         dmaap = mock(DmaapManager.class);
         gotDmaap = false;
         controller = mock(ListeningController.class);
@@ -140,8 +133,6 @@ public class PoolingManagerImplTest {
         schedCount = 0;
         drools = mock(DroolsController.class);
 
-        when(extractors.extract(DECODED_EVENT)).thenReturn(REQUEST_ID);
-
         when(controller.getName()).thenReturn(MY_CONTROLLER);
         when(controller.getDrools()).thenReturn(drools);
         when(controller.isAlive()).thenReturn(true);
@@ -175,18 +166,6 @@ public class PoolingManagerImplTest {
         assertEquals(mgr.getHost(), st.getHost());
     }
 
-    @Test
-    public void testPoolingManagerImpl_ClassEx() {
-        /*
-         * this controller does not implement TopicListener, which should cause a
-         * ClassCastException
-         */
-        PolicyController ctlr = mock(PolicyController.class);
-
-        assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, ctlr, poolProps, active))
-                        .isInstanceOf(PoolingFeatureRtException.class).hasCauseInstanceOf(ClassCastException.class);
-    }
-
     @Test
     public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
         // throw an exception when we try to create the dmaap manager
@@ -284,23 +263,20 @@ public class PoolingManagerImplTest {
         startMgr();
         mgr.startDistributing(makeAssignments(false));
 
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-        verify(dmaap, times(START_PUB + 1)).publish(any());
+        verify(dmaap, times(START_PUB)).publish(any());
 
         mgr.beforeStop();
 
         verify(dmaap).stopConsumer(mgr);
         verify(sched).shutdownNow();
-        verify(dmaap, times(START_PUB + 2)).publish(any());
+        verify(dmaap, times(START_PUB + 1)).publish(any());
         verify(dmaap).publish(contains("offline"));
 
         assertTrue(mgr.getCurrent() instanceof IdleState);
 
         // verify that next message is handled locally
-        mgr.handle(msg);
-        verify(dmaap, times(START_PUB + 2)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+        verify(dmaap, times(START_PUB + 1)).publish(any());
     }
 
     @Test
@@ -407,19 +383,11 @@ public class PoolingManagerImplTest {
         // start should invoke changeState()
         startMgr();
 
-        int ntimes = 0;
-
-        // should have set the filter for the StartState
-        verify(dmaap, times(++ntimes)).setFilter(any());
-
         /*
          * now go offline while it's locked
          */
         lockMgr();
 
-        // should have set the new filter
-        verify(dmaap, times(++ntimes)).setFilter(any());
-
         // should have cancelled the timers
         assertEquals(2, futures.size());
         verify(futures.poll()).cancel(false);
@@ -430,35 +398,12 @@ public class PoolingManagerImplTest {
          */
         unlockMgr();
 
-        // should have set the new filter
-        verify(dmaap, times(++ntimes)).setFilter(any());
-
         // new timers should now be active
         assertEquals(2, futures.size());
         verify(futures.poll(), never()).cancel(false);
         verify(futures.poll(), never()).cancel(false);
     }
 
-    @Test
-    public void testSetFilter() throws Exception {
-        // start should cause a filter to be set
-        startMgr();
-
-        verify(dmaap).setFilter(any());
-    }
-
-    @Test
-    public void testSetFilter_DmaapEx() throws Exception {
-
-        // generate an exception
-        doThrow(new PoolingFeatureException()).when(dmaap).setFilter(any());
-
-        // start should invoke setFilter()
-        assertThatCode(() -> startMgr()).doesNotThrowAnyException();
-
-        // no exception, means success
-    }
-
     @Test
     public void testSchedule() throws Exception {
         // must start the scheduler
@@ -583,64 +528,35 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testBeforeOffer_Unlocked_NoIntercept() throws Exception {
+    public void testBeforeOffer_Unlocked() throws Exception {
         startMgr();
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
-    }
-
-    @Test
-    public void testBeforeOffer_Locked_NoIntercept() throws Exception {
-        startMgr();
-
-        lockMgr();
+        // route the message to another host
+        mgr.startDistributing(makeAssignments(false));
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+        assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
-    public void testBeforeOffer_Locked_Intercept() throws Exception {
+    public void testBeforeOffer_Locked() throws Exception {
         startMgr();
         lockMgr();
 
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(true));
-
-        final CountDownLatch latch = catchRecursion(false);
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+        // route the message to another host
+        mgr.startDistributing(makeAssignments(false));
 
-        // ensure we made it past both beforeXxx() methods
-        assertEquals(0, latch.getCount());
+        assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
-    public void testBeforeInsert_Intercept() throws Exception {
+    public void testBeforeInsert() throws Exception {
         startMgr();
         lockMgr();
 
         // route the message to this host
         mgr.startDistributing(makeAssignments(true));
 
-        final CountDownLatch latch = catchRecursion(true);
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
-        // ensure we made it past both beforeXxx() methods
-        assertEquals(0, latch.getCount());
-    }
-
-    @Test
-    public void testBeforeInsert_NoIntercept() throws Exception {
-        validateUnhandled(CommInfrastructure.UEB);
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
     }
 
     @Test
@@ -657,17 +573,17 @@ public class PoolingManagerImplTest {
     public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
         startMgr();
 
-        assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
+        assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
     }
 
     @Test
     public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
-        validateUnhandled(CommInfrastructure.UEB);
+        validateUnhandled();
     }
 
     @Test
     public void testHandleExternalForward_NoAssignments() throws Exception {
-        validateUnhandled(CommInfrastructure.UEB);
+        validateUnhandled();
     }
 
     @Test
@@ -678,7 +594,7 @@ public class PoolingManagerImplTest {
     @Test
     public void testHandleEvent_NullTarget() throws Exception {
         // buckets have null targets
-        validateHandled(new BucketAssignments(new String[] {null, null}), START_PUB);
+        validateDiscarded(new BucketAssignments(new String[] {null, null}));
     }
 
     @Test
@@ -687,46 +603,9 @@ public class PoolingManagerImplTest {
     }
 
     @Test
-    public void testHandleEvent_DiffHost_TooManyHops() throws Exception {
-        startMgr();
-
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(false));
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        msg.setNumHops(PoolingManagerImpl.MAX_HOPS + 1);
-        mgr.handle(msg);
-
-        // shouldn't publish
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-    }
-
-    @Test
-    public void testHandleEvent_DiffHost_Forward() throws Exception {
-        validateHandled(makeAssignments(false), START_PUB + 1);
-    }
-
-    @Test
-    public void testExtractRequestId_NullEvent() throws Exception {
-        startMgr();
-
-        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, null));
-    }
-
-    @Test
-    public void testExtractRequestId_NullReqId() throws Exception {
-        validateHandleReqId(null);
-    }
-
-    @Test
-    public void testExtractRequestId() throws Exception {
-        startMgr();
-
+    public void testHandleEvent_DiffHost() throws Exception {
         // route the message to the *OTHER* host
-        mgr.startDistributing(makeAssignments(false));
-
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        validateDiscarded(makeAssignments(false));
     }
 
     @Test
@@ -746,7 +625,7 @@ public class PoolingManagerImplTest {
         // create assignments, though they are irrelevant
         mgr.startDistributing(makeAssignments(false));
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+        assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
@@ -767,7 +646,7 @@ public class PoolingManagerImplTest {
         // create assignments, though they are irrelevant
         mgr.startDistributing(makeAssignments(false));
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+        assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
@@ -787,7 +666,7 @@ public class PoolingManagerImplTest {
         // create assignments, though they are irrelevant
         mgr.startDistributing(makeAssignments(false));
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+        assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
@@ -807,7 +686,7 @@ public class PoolingManagerImplTest {
         // create assignments, though they are irrelevant
         mgr.startDistributing(makeAssignments(false));
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+        assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
@@ -819,101 +698,7 @@ public class PoolingManagerImplTest {
         // route to another host
         mgr.startDistributing(makeAssignments(false));
 
-        assertTrue(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
-    }
-
-    @Test
-    public void testMakeForward() throws Exception {
-        startMgr();
-
-        // route the message to another host
-        mgr.startDistributing(makeAssignments(false));
-
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        verify(dmaap, times(START_PUB + 1)).publish(any());
-    }
-
-    @Test
-    public void testMakeForward_InvalidMsg() throws Exception {
-        startMgr();
-
-        // route the message to another host
-        mgr.startDistributing(makeAssignments(false));
-
-        assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        // should not have tried to publish a message
-        verify(dmaap, times(START_PUB)).publish(any());
-    }
-
-    @Test
-    public void testHandle_SameHost() throws Exception {
-        startMgr();
-
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(true));
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-    }
-
-    @Test
-    public void testHandle_DiffHost() throws Exception {
-        startMgr();
-
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(false));
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB + 1)).publish(any());
-        verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-    }
-
-    @Test
-    public void testInject() throws Exception {
-        startMgr();
-
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(true));
-
-        final CountDownLatch latch = catchRecursion(true);
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
-        // ensure we made it past both beforeXxx() methods
-        assertEquals(0, latch.getCount());
-    }
-
-    @Test
-    public void testInject_Ex() throws Exception {
-        startMgr();
-
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(true));
-
-        // generate RuntimeException when onTopicEvent() is invoked
-        doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
-
-        final CountDownLatch latch = catchRecursion(true);
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
-        // ensure we made it past both beforeXxx() methods
-        assertEquals(0, latch.getCount());
+        assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
@@ -971,20 +756,18 @@ public class PoolingManagerImplTest {
 
         // null assignments should cause message to be processed locally
         mgr.startDistributing(null);
-        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
         verify(dmaap, times(START_PUB)).publish(any());
 
 
-        // route the message to this host
+        // message for this host
         mgr.startDistributing(makeAssignments(true));
-        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-        verify(dmaap, times(START_PUB)).publish(any());
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
 
 
-        // route the message to the other host
+        // message for another host
         mgr.startDistributing(makeAssignments(false));
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-        verify(dmaap, times(START_PUB + 1)).publish(any());
+        assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
     }
 
     @Test
@@ -1088,9 +871,7 @@ public class PoolingManagerImplTest {
     private void validateHandleReqId(String requestId) throws PoolingFeatureException {
         startMgr();
 
-        when(extractors.extract(any())).thenReturn(requestId);
-
-        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
     }
 
     private void validateNoForward() throws PoolingFeatureException {
@@ -1099,67 +880,23 @@ public class PoolingManagerImplTest {
         // route the message to this host
         mgr.startDistributing(makeAssignments(true));
 
-        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
 
         verify(dmaap, times(START_PUB)).publish(any());
     }
 
-    private void validateHandled(BucketAssignments assignments, int publishCount) throws PoolingFeatureException {
+    private void validateUnhandled() throws PoolingFeatureException {
         startMgr();
-
-        // route the message to the *OTHER* host
-        mgr.startDistributing(assignments);
-
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        verify(dmaap, times(publishCount)).publish(any());
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
     }
 
-    private void validateUnhandled(CommInfrastructure infra) throws PoolingFeatureException {
+    private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
         startMgr();
-        assertFalse(mgr.beforeInsert(infra, TOPIC2, THE_EVENT, DECODED_EVENT));
-    }
-
-    /**
-     * Configure the mock controller to act like a real controller, invoking beforeOffer
-     * and then beforeInsert, so we can make sure they pass through. We'll keep count to
-     * ensure we don't get into infinite recursion.
-     *
-     * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked,
-     *        {@code false} if it should be skipped
-     *
-     * @return a latch that will be counted down if both beforeXxx() methods return false
-     */
-    private CountDownLatch catchRecursion(boolean invokeBeforeInsert) {
-        CountDownLatch recursion = new CountDownLatch(3);
-        CountDownLatch latch = new CountDownLatch(1);
-
-        doAnswer(args -> {
-
-            recursion.countDown();
-            if (recursion.getCount() == 0) {
-                fail("recursive calls to onTopicEvent");
-            }
-
-            int iarg = 0;
-            CommInfrastructure proto = args.getArgument(iarg++);
-            String topic = args.getArgument(iarg++);
-            String event = args.getArgument(iarg++);
-
-            if (mgr.beforeOffer(proto, topic, event)) {
-                return null;
-            }
 
-            if (invokeBeforeInsert && mgr.beforeInsert(proto, topic, event, DECODED_EVENT)) {
-                return null;
-            }
-
-            latch.countDown();
-
-            return null;
-        }).when(controller).onTopicEvent(any(), any(), any());
+        // buckets have null targets
+        mgr.startDistributing(bucketAssignments);
 
-        return latch;
+        assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
     }
 
     /**
@@ -1170,7 +907,7 @@ public class PoolingManagerImplTest {
      * @return a new bucket assignment
      */
     private BucketAssignments makeAssignments(boolean sameHost) {
-        int slot = REQUEST_ID.hashCode() % 2;
+        int slot = DECODED_EVENT.hashCode() % 2;
 
         // slot numbers are 0 and 1 - reverse them if it's for a different host
         if (!sameHost) {
@@ -1199,6 +936,7 @@ public class PoolingManagerImplTest {
      */
     private void lockMgr() {
         mgr.beforeLock();
+        when(controller.isLocked()).thenReturn(true);
     }
 
     /**
@@ -1206,6 +944,7 @@ public class PoolingManagerImplTest {
      */
     private void unlockMgr() {
         mgr.afterUnlock();
+        when(controller.isLocked()).thenReturn(false);
     }
 
     /**
@@ -1226,11 +965,6 @@ public class PoolingManagerImplTest {
             super(host, controller, props, activeLatch);
         }
 
-        @Override
-        protected ClassExtractors makeClassExtractors(Properties props) {
-            return extractors;
-        }
-
         @Override
         protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
             gotDmaap = true;