2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.ArgumentMatchers.contains;
30 import static org.mockito.Mockito.doThrow;
31 import static org.mockito.Mockito.mock;
32 import static org.mockito.Mockito.never;
33 import static org.mockito.Mockito.times;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
37 import java.util.LinkedList;
38 import java.util.Properties;
39 import java.util.Queue;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ScheduledFuture;
42 import java.util.concurrent.ScheduledThreadPoolExecutor;
43 import java.util.concurrent.TimeUnit;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
48 import org.onap.policy.common.endpoints.event.comm.TopicListener;
49 import org.onap.policy.drools.controller.DroolsController;
50 import org.onap.policy.drools.pooling.message.BucketAssignments;
51 import org.onap.policy.drools.pooling.message.Heartbeat;
52 import org.onap.policy.drools.pooling.message.Message;
53 import org.onap.policy.drools.pooling.message.Offline;
54 import org.onap.policy.drools.pooling.state.ActiveState;
55 import org.onap.policy.drools.pooling.state.IdleState;
56 import org.onap.policy.drools.pooling.state.InactiveState;
57 import org.onap.policy.drools.pooling.state.QueryState;
58 import org.onap.policy.drools.pooling.state.StartState;
59 import org.onap.policy.drools.pooling.state.State;
60 import org.onap.policy.drools.system.PolicyController;
62 public class PoolingManagerImplTest {
64 protected static final long STD_HEARTBEAT_WAIT_MS = 10;
65 protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
66 protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
67 protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
68 protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
69 protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
71 private static final String MY_HOST = "my.host";
72 private static final String HOST2 = "other.host";
74 private static final String MY_CONTROLLER = "my.controller";
75 private static final String MY_TOPIC = "my.topic";
77 private static final String TOPIC2 = "topic.two";
79 private static final String THE_EVENT = "the event";
81 private static final Object DECODED_EVENT = new Object();
84 * Number of dmaap.publish() invocations that should be issued when the manager is
87 private static final int START_PUB = 1;
90 * Futures that have been allocated due to calls to scheduleXxx().
92 private Queue<ScheduledFuture<?>> futures;
94 private PoolingProperties poolProps;
95 private ListeningController controller;
96 private DmaapManager dmaap;
97 private boolean gotDmaap;
98 private ScheduledThreadPoolExecutor sched;
99 private int schedCount;
100 private DroolsController drools;
101 private Serializer ser;
102 private CountDownLatch active;
104 private PoolingManagerImpl mgr;
109 * @throws Exception throws exception
112 public void setUp() throws Exception {
113 Properties plainProps = new Properties();
115 poolProps = mock(PoolingProperties.class);
116 when(poolProps.getSource()).thenReturn(plainProps);
117 when(poolProps.getPoolingTopic()).thenReturn(MY_TOPIC);
118 when(poolProps.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
119 when(poolProps.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
120 when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
121 when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
122 when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
123 when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS);
125 futures = new LinkedList<>();
126 ser = new Serializer();
127 active = new CountDownLatch(1);
129 dmaap = mock(DmaapManager.class);
131 controller = mock(ListeningController.class);
132 sched = mock(ScheduledThreadPoolExecutor.class);
134 drools = mock(DroolsController.class);
136 when(controller.getName()).thenReturn(MY_CONTROLLER);
137 when(controller.getDrools()).thenReturn(drools);
138 when(controller.isAlive()).thenReturn(true);
140 when(sched.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class))).thenAnswer(args -> {
141 ScheduledFuture<?> fut = mock(ScheduledFuture.class);
147 when(sched.scheduleWithFixedDelay(any(Runnable.class), any(Long.class), any(Long.class), any(TimeUnit.class)))
148 .thenAnswer(args -> {
149 ScheduledFuture<?> fut = mock(ScheduledFuture.class);
155 mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active);
159 public void testPoolingManagerImpl() throws Exception {
160 assertTrue(gotDmaap);
162 State st = mgr.getCurrent();
163 assertTrue(st instanceof IdleState);
165 // ensure the state is attached to the manager
166 assertEquals(mgr.getHost(), st.getHost());
170 public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
171 // throw an exception when we try to create the dmaap manager
172 PoolingFeatureException ex = new PoolingFeatureException();
174 assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
176 protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
179 }).isInstanceOf(PoolingFeatureRtException.class).hasCause(ex);
183 public void testGetCurrent() throws Exception {
184 assertEquals(IdleState.class, mgr.getCurrent().getClass());
188 assertEquals(StartState.class, mgr.getCurrent().getClass());
192 public void testGetHost() {
193 assertEquals(MY_HOST, mgr.getHost());
195 mgr = new PoolingManagerTest(HOST2, controller, poolProps, active);
196 assertEquals(HOST2, mgr.getHost());
200 public void testGetTopic() {
201 assertEquals(MY_TOPIC, mgr.getTopic());
205 public void testGetProperties() {
206 assertEquals(poolProps, mgr.getProperties());
210 public void testBeforeStart() throws Exception {
214 verify(dmaap).startPublisher();
216 assertEquals(1, schedCount);
217 verify(sched).setMaximumPoolSize(1);
218 verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
221 // try again - nothing should happen
224 verify(dmaap).startPublisher();
226 assertEquals(1, schedCount);
227 verify(sched).setMaximumPoolSize(1);
228 verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
232 public void testAfterStart() throws Exception {
235 verify(dmaap).startConsumer(mgr);
237 State st = mgr.getCurrent();
238 assertTrue(st instanceof StartState);
240 // ensure the state is attached to the manager
241 assertEquals(mgr.getHost(), st.getHost());
243 ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
244 ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
245 verify(sched).schedule(any(Runnable.class), timeCap.capture(), unitCap.capture());
247 assertEquals(STD_HEARTBEAT_WAIT_MS, timeCap.getValue().longValue());
248 assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
251 // already started - nothing else happens
254 verify(dmaap).startConsumer(mgr);
256 assertTrue(mgr.getCurrent() instanceof StartState);
258 verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
262 public void testBeforeStop() throws Exception {
264 mgr.startDistributing(makeAssignments(false));
266 verify(dmaap, times(START_PUB)).publish(any());
270 verify(dmaap).stopConsumer(mgr);
271 verify(sched).shutdownNow();
272 verify(dmaap, times(START_PUB + 1)).publish(any());
273 verify(dmaap).publish(contains("offline"));
275 assertTrue(mgr.getCurrent() instanceof IdleState);
277 // verify that next message is handled locally
278 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
279 verify(dmaap, times(START_PUB + 1)).publish(any());
283 public void testBeforeStop_NotRunning() throws Exception {
284 final State st = mgr.getCurrent();
288 verify(dmaap, never()).stopConsumer(any());
289 verify(sched, never()).shutdownNow();
291 // hasn't changed states either
292 assertEquals(st, mgr.getCurrent());
296 public void testBeforeStop_AfterPartialStart() throws Exception {
297 // call beforeStart but not afterStart
300 final State st = mgr.getCurrent();
304 // should still shut the scheduler down
305 verify(sched).shutdownNow();
307 verify(dmaap, never()).stopConsumer(any());
309 // hasn't changed states
310 assertEquals(st, mgr.getCurrent());
314 public void testAfterStop() throws Exception {
320 verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
324 public void testBeforeLock() throws Exception {
329 assertTrue(mgr.getCurrent() instanceof IdleState);
333 public void testAfterUnlock_AliveIdle() throws Exception {
334 // this really shouldn't happen
340 // stays in idle state, because it has no scheduler
341 assertTrue(mgr.getCurrent() instanceof IdleState);
345 public void testAfterUnlock_AliveStarted() throws Exception {
351 assertTrue(mgr.getCurrent() instanceof StartState);
355 public void testAfterUnlock_StoppedIdle() throws Exception {
359 // controller is stopped
360 when(controller.isAlive()).thenReturn(false);
364 assertTrue(mgr.getCurrent() instanceof IdleState);
368 public void testAfterUnlock_StoppedStarted() throws Exception {
371 // Note: don't lockMgr()
373 // controller is stopped
374 when(controller.isAlive()).thenReturn(false);
378 assertTrue(mgr.getCurrent() instanceof StartState);
382 public void testChangeState() throws Exception {
383 // start should invoke changeState()
387 * now go offline while it's locked
391 // should have cancelled the timers
392 assertEquals(2, futures.size());
393 verify(futures.poll()).cancel(false);
394 verify(futures.poll()).cancel(false);
401 // new timers should now be active
402 assertEquals(2, futures.size());
403 verify(futures.poll(), never()).cancel(false);
404 verify(futures.poll(), never()).cancel(false);
408 public void testSchedule() throws Exception {
409 // must start the scheduler
412 CountDownLatch latch = new CountDownLatch(1);
414 mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
420 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
421 ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
422 ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
424 verify(sched, times(2)).schedule(taskCap.capture(), timeCap.capture(), unitCap.capture());
426 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
427 assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
430 taskCap.getValue().run();
432 assertEquals(0, latch.getCount());
436 public void testScheduleWithFixedDelay() throws Exception {
437 // must start the scheduler
440 CountDownLatch latch = new CountDownLatch(1);
442 mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> {
448 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
449 ArgumentCaptor<Long> initCap = ArgumentCaptor.forClass(Long.class);
450 ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
451 ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
453 verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
456 assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
457 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
458 assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
461 taskCap.getValue().run();
463 assertEquals(0, latch.getCount());
467 public void testPublishAdmin() throws Exception {
468 Offline msg = new Offline(mgr.getHost());
469 mgr.publishAdmin(msg);
471 assertEquals(Message.ADMIN, msg.getChannel());
473 verify(dmaap).publish(any());
477 public void testPublish() throws Exception {
478 Offline msg = new Offline(mgr.getHost());
479 mgr.publish("my.channel", msg);
481 assertEquals("my.channel", msg.getChannel());
483 verify(dmaap).publish(any());
487 public void testPublish_InvalidMsg() throws Exception {
488 // message is missing data
489 mgr.publish(Message.ADMIN, new Offline());
491 // should not have attempted to publish it
492 verify(dmaap, never()).publish(any());
496 public void testPublish_DmaapEx() throws Exception {
498 // generate exception
499 doThrow(new PoolingFeatureException()).when(dmaap).publish(any());
501 assertThatCode(() -> mgr.publish(Message.ADMIN, new Offline(mgr.getHost()))).doesNotThrowAnyException();
505 public void testOnTopicEvent() throws Exception {
508 StartState st = (StartState) mgr.getCurrent();
511 * give it its heart beat, that should cause it to transition to the Query state.
513 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
514 hb.setChannel(Message.ADMIN);
516 String msg = ser.encodeMsg(hb);
518 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
520 assertTrue(mgr.getCurrent() instanceof QueryState);
524 public void testOnTopicEvent_NullEvent() throws Exception {
527 assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null)).doesNotThrowAnyException();
531 public void testBeforeOffer_Unlocked() throws Exception {
534 // route the message to another host
535 mgr.startDistributing(makeAssignments(false));
537 assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
541 public void testBeforeOffer_Locked() throws Exception {
545 // route the message to another host
546 mgr.startDistributing(makeAssignments(false));
548 assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
552 public void testBeforeInsert() throws Exception {
556 // route the message to this host
557 mgr.startDistributing(makeAssignments(true));
559 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
563 public void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
564 validateHandleReqId(null);
568 public void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
569 validateHandleReqId("");
573 public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
576 assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
580 public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
585 public void testHandleExternalForward_NoAssignments() throws Exception {
590 public void testHandleExternalForward() throws Exception {
595 public void testHandleEvent_NullTarget() throws Exception {
596 // buckets have null targets
597 validateDiscarded(new BucketAssignments(new String[] {null, null}));
601 public void testHandleEvent_SameHost() throws Exception {
606 public void testHandleEvent_DiffHost() throws Exception {
607 // route the message to the *OTHER* host
608 validateDiscarded(makeAssignments(false));
612 public void testDecodeEvent_CannotDecode() throws Exception {
614 mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
616 protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
623 when(controller.isLocked()).thenReturn(true);
625 // create assignments, though they are irrelevant
626 mgr.startDistributing(makeAssignments(false));
628 assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
632 public void testDecodeEvent_UnsuppEx() throws Exception {
634 // generate exception
635 mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
637 protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
638 throw new UnsupportedOperationException();
644 when(controller.isLocked()).thenReturn(true);
646 // create assignments, though they are irrelevant
647 mgr.startDistributing(makeAssignments(false));
649 assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
653 public void testDecodeEvent_ArgEx() throws Exception {
654 // generate exception
655 mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
657 protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
658 throw new IllegalArgumentException();
664 when(controller.isLocked()).thenReturn(true);
666 // create assignments, though they are irrelevant
667 mgr.startDistributing(makeAssignments(false));
669 assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
673 public void testDecodeEvent_StateEx() throws Exception {
674 // generate exception
675 mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
677 protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
678 throw new IllegalStateException();
684 when(controller.isLocked()).thenReturn(true);
686 // create assignments, though they are irrelevant
687 mgr.startDistributing(makeAssignments(false));
689 assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
693 public void testDecodeEvent() throws Exception {
696 when(controller.isLocked()).thenReturn(true);
698 // route to another host
699 mgr.startDistributing(makeAssignments(false));
701 assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
705 public void testHandleInternal() throws Exception {
708 StartState st = (StartState) mgr.getCurrent();
711 * give it its heart beat, that should cause it to transition to the Query state.
713 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
714 hb.setChannel(Message.ADMIN);
716 String msg = ser.encodeMsg(hb);
718 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
720 assertTrue(mgr.getCurrent() instanceof QueryState);
724 public void testHandleInternal_IoEx() throws Exception {
727 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
729 assertTrue(mgr.getCurrent() instanceof StartState);
733 public void testHandleInternal_PoolEx() throws Exception {
736 StartState st = (StartState) mgr.getCurrent();
738 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
741 * do NOT set the channel - this will cause the message to be invalid, triggering
745 String msg = ser.encodeMsg(hb);
747 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
749 assertTrue(mgr.getCurrent() instanceof StartState);
753 public void testStartDistributing() throws Exception {
757 // null assignments should cause message to be processed locally
758 mgr.startDistributing(null);
759 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
760 verify(dmaap, times(START_PUB)).publish(any());
763 // message for this host
764 mgr.startDistributing(makeAssignments(true));
765 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
768 // message for another host
769 mgr.startDistributing(makeAssignments(false));
770 assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
774 public void testGoStart() {
775 State st = mgr.goStart();
776 assertTrue(st instanceof StartState);
777 assertEquals(mgr.getHost(), st.getHost());
781 public void testGoQuery() {
782 BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
783 mgr.startDistributing(asgn);
785 State st = mgr.goQuery();
787 assertTrue(st instanceof QueryState);
788 assertEquals(mgr.getHost(), st.getHost());
789 assertEquals(asgn, mgr.getAssignments());
793 public void testGoActive() {
794 BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
795 mgr.startDistributing(asgn);
797 State st = mgr.goActive();
799 assertTrue(st instanceof ActiveState);
800 assertEquals(mgr.getHost(), st.getHost());
801 assertEquals(asgn, mgr.getAssignments());
802 assertEquals(0, active.getCount());
806 public void testGoInactive() {
807 State st = mgr.goInactive();
808 assertTrue(st instanceof InactiveState);
809 assertEquals(mgr.getHost(), st.getHost());
810 assertEquals(1, active.getCount());
814 public void testTimerActionRun() throws Exception {
815 // must start the scheduler
818 CountDownLatch latch = new CountDownLatch(1);
820 mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
826 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
828 verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
831 taskCap.getValue().run();
833 assertEquals(0, latch.getCount());
837 public void testTimerActionRun_DiffState() throws Exception {
838 // must start the scheduler
841 CountDownLatch latch = new CountDownLatch(1);
843 mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
849 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
851 verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
853 // give it a heartbeat so that it transitions to the query state
854 StartState st = (StartState) mgr.getCurrent();
855 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
856 hb.setChannel(Message.ADMIN);
858 String msg = ser.encodeMsg(hb);
860 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
862 assertTrue(mgr.getCurrent() instanceof QueryState);
865 taskCap.getValue().run();
867 // it should NOT have counted down
868 assertEquals(1, latch.getCount());
871 private void validateHandleReqId(String requestId) throws PoolingFeatureException {
874 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
877 private void validateNoForward() throws PoolingFeatureException {
880 // route the message to this host
881 mgr.startDistributing(makeAssignments(true));
883 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
885 verify(dmaap, times(START_PUB)).publish(any());
888 private void validateUnhandled() throws PoolingFeatureException {
890 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
893 private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
896 // buckets have null targets
897 mgr.startDistributing(bucketAssignments);
899 assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
903 * Makes an assignment with two buckets.
905 * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the
906 * manager's bucket, {@code false} if it should hash to the other host's bucket
907 * @return a new bucket assignment
909 private BucketAssignments makeAssignments(boolean sameHost) {
910 int slot = DECODED_EVENT.hashCode() % 2;
912 // slot numbers are 0 and 1 - reverse them if it's for a different host
917 String[] asgn = new String[2];
918 asgn[slot] = mgr.getHost();
919 asgn[1 - slot] = HOST2;
921 return new BucketAssignments(asgn);
925 * Invokes methods necessary to start the manager.
927 * @throws PoolingFeatureException if an error occurs
929 private void startMgr() throws PoolingFeatureException {
935 * Invokes methods necessary to lock the manager.
937 private void lockMgr() {
939 when(controller.isLocked()).thenReturn(true);
943 * Invokes methods necessary to unlock the manager.
945 private void unlockMgr() {
947 when(controller.isLocked()).thenReturn(false);
951 * Used to create a mock object that implements both super interfaces.
953 private static interface ListeningController extends TopicListener, PolicyController {
958 * Manager with overrides.
960 private class PoolingManagerTest extends PoolingManagerImpl {
962 public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
963 CountDownLatch activeLatch) {
965 super(host, controller, props, activeLatch);
969 protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
975 protected ScheduledThreadPoolExecutor makeScheduler() {
981 protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
982 return (drools2 == drools && TOPIC2.equals(topic2));
986 protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
987 if (drools2 == drools && TOPIC2.equals(topic2) && event == THE_EVENT) {
988 return DECODED_EVENT;