2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertFalse;
28 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
30 import static org.mockito.ArgumentMatchers.any;
31 import static org.mockito.ArgumentMatchers.contains;
32 import static org.mockito.Mockito.doThrow;
33 import static org.mockito.Mockito.mock;
34 import static org.mockito.Mockito.never;
35 import static org.mockito.Mockito.times;
36 import static org.mockito.Mockito.verify;
37 import static org.mockito.Mockito.when;
39 import java.util.LinkedList;
40 import java.util.Properties;
41 import java.util.Queue;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.ScheduledFuture;
44 import java.util.concurrent.ScheduledThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import org.junit.jupiter.api.BeforeEach;
47 import org.junit.jupiter.api.Test;
48 import org.mockito.ArgumentCaptor;
49 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
50 import org.onap.policy.common.endpoints.event.comm.TopicListener;
51 import org.onap.policy.drools.controller.DroolsController;
52 import org.onap.policy.drools.pooling.message.BucketAssignments;
53 import org.onap.policy.drools.pooling.message.Heartbeat;
54 import org.onap.policy.drools.pooling.message.Message;
55 import org.onap.policy.drools.pooling.message.Offline;
56 import org.onap.policy.drools.pooling.state.ActiveState;
57 import org.onap.policy.drools.pooling.state.IdleState;
58 import org.onap.policy.drools.pooling.state.InactiveState;
59 import org.onap.policy.drools.pooling.state.QueryState;
60 import org.onap.policy.drools.pooling.state.StartState;
61 import org.onap.policy.drools.pooling.state.State;
62 import org.onap.policy.drools.system.PolicyController;
64 class PoolingManagerImplTest {
66 protected static final long STD_HEARTBEAT_WAIT_MS = 10;
67 protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
68 protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
69 protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
70 protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
71 protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
73 private static final String MY_HOST = "my.host";
74 private static final String HOST2 = "other.host";
76 private static final String MY_CONTROLLER = "my.controller";
77 private static final String MY_TOPIC = "my.topic";
79 private static final String TOPIC2 = "topic.two";
81 private static final String THE_EVENT = "the event";
83 private static final Object DECODED_EVENT = new Object();
86 * Number of publish() invocations that should be issued when the manager is
89 private static final int START_PUB = 1;
92 * Futures that have been allocated due to calls to scheduleXxx().
94 private Queue<ScheduledFuture<?>> futures;
96 private PoolingProperties poolProps;
97 private ListeningController controller;
98 private TopicMessageManager topicMessageManager;
99 private boolean gotManager;
100 private ScheduledThreadPoolExecutor sched;
101 private int schedCount;
102 private DroolsController drools;
103 private Serializer ser;
104 private CountDownLatch active;
106 private PoolingManagerImpl mgr;
111 * @throws Exception throws exception
114 public void setUp() throws Exception {
115 Properties plainProps = new Properties();
117 poolProps = mock(PoolingProperties.class);
118 when(poolProps.getSource()).thenReturn(plainProps);
119 when(poolProps.getPoolingTopic()).thenReturn(MY_TOPIC);
120 when(poolProps.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
121 when(poolProps.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
122 when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
123 when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
124 when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
125 when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS);
127 futures = new LinkedList<>();
128 ser = new Serializer();
129 active = new CountDownLatch(1);
131 topicMessageManager = mock(TopicMessageManager.class);
133 controller = mock(ListeningController.class);
134 sched = mock(ScheduledThreadPoolExecutor.class);
136 drools = mock(DroolsController.class);
138 when(controller.getName()).thenReturn(MY_CONTROLLER);
139 when(controller.getDrools()).thenReturn(drools);
140 when(controller.isAlive()).thenReturn(true);
142 when(sched.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class))).thenAnswer(args -> {
143 ScheduledFuture<?> fut = mock(ScheduledFuture.class);
149 when(sched.scheduleWithFixedDelay(any(Runnable.class), any(Long.class), any(Long.class), any(TimeUnit.class)))
150 .thenAnswer(args -> {
151 ScheduledFuture<?> fut = mock(ScheduledFuture.class);
157 mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active);
161 void testPoolingManagerImpl() {
162 assertTrue(gotManager);
164 State st = mgr.getCurrent();
165 assertInstanceOf(IdleState.class, st);
167 // ensure the state is attached to the manager
168 assertEquals(mgr.getHost(), st.getHost());
172 void testPoolingManagerImpl_PoolEx() {
173 // throw an exception when we try to create the topic messages manager
174 PoolingFeatureException ex = new PoolingFeatureException();
176 assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
178 protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
181 }).isInstanceOf(PoolingFeatureRtException.class).hasCause(ex);
185 void testGetCurrent() throws Exception {
186 assertEquals(IdleState.class, mgr.getCurrent().getClass());
190 assertEquals(StartState.class, mgr.getCurrent().getClass());
195 assertEquals(MY_HOST, mgr.getHost());
197 mgr = new PoolingManagerTest(HOST2, controller, poolProps, active);
198 assertEquals(HOST2, mgr.getHost());
202 void testGetTopic() {
203 assertEquals(MY_TOPIC, mgr.getTopic());
207 void testGetProperties() {
208 assertEquals(poolProps, mgr.getProperties());
212 void testBeforeStart() {
216 verify(topicMessageManager).startPublisher();
218 assertEquals(1, schedCount);
219 verify(sched).setMaximumPoolSize(1);
220 verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
223 // try again - nothing should happen
226 verify(topicMessageManager).startPublisher();
228 assertEquals(1, schedCount);
229 verify(sched).setMaximumPoolSize(1);
230 verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
234 void testAfterStart() throws Exception {
237 verify(topicMessageManager).startConsumer(mgr);
239 State st = mgr.getCurrent();
240 assertInstanceOf(StartState.class, st);
242 // ensure the state is attached to the manager
243 assertEquals(mgr.getHost(), st.getHost());
245 ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
246 ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
247 verify(sched).schedule(any(Runnable.class), timeCap.capture(), unitCap.capture());
249 assertEquals(STD_HEARTBEAT_WAIT_MS, timeCap.getValue().longValue());
250 assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
253 // already started - nothing else happens
256 verify(topicMessageManager).startConsumer(mgr);
258 assertInstanceOf(StartState.class, mgr.getCurrent());
260 verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
264 void testBeforeStop() throws Exception {
266 mgr.startDistributing(makeAssignments(false));
268 verify(topicMessageManager, times(START_PUB)).publish(any());
272 verify(topicMessageManager).stopConsumer(mgr);
273 verify(sched).shutdownNow();
274 verify(topicMessageManager, times(START_PUB + 1)).publish(any());
275 verify(topicMessageManager).publish(contains("offline"));
277 assertInstanceOf(IdleState.class, mgr.getCurrent());
279 // verify that next message is handled locally
280 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
281 verify(topicMessageManager, times(START_PUB + 1)).publish(any());
285 void testBeforeStop_NotRunning() {
286 final State st = mgr.getCurrent();
290 verify(topicMessageManager, never()).stopConsumer(any());
291 verify(sched, never()).shutdownNow();
293 // hasn't changed states either
294 assertEquals(st, mgr.getCurrent());
298 void testBeforeStop_AfterPartialStart() {
299 // call beforeStart but not afterStart
302 final State st = mgr.getCurrent();
306 // should still shut the scheduler down
307 verify(sched).shutdownNow();
309 verify(topicMessageManager, never()).stopConsumer(any());
311 // hasn't changed states
312 assertEquals(st, mgr.getCurrent());
316 void testAfterStop() throws Exception {
322 verify(topicMessageManager).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
326 void testBeforeLock() throws Exception {
331 assertInstanceOf(IdleState.class, mgr.getCurrent());
335 void testAfterUnlock_AliveIdle() {
336 // this really shouldn't happen
342 // stays in idle state, because it has no scheduler
343 assertInstanceOf(IdleState.class, mgr.getCurrent());
347 void testAfterUnlock_AliveStarted() throws Exception {
353 assertInstanceOf(StartState.class, mgr.getCurrent());
357 void testAfterUnlock_StoppedIdle() throws Exception {
361 // controller is stopped
362 when(controller.isAlive()).thenReturn(false);
366 assertInstanceOf(IdleState.class, mgr.getCurrent());
370 void testAfterUnlock_StoppedStarted() throws Exception {
373 // Note: don't lockMgr()
375 // controller is stopped
376 when(controller.isAlive()).thenReturn(false);
380 assertInstanceOf(StartState.class, mgr.getCurrent());
384 void testChangeState() throws Exception {
385 // start should invoke changeState()
389 * now go offline while it's locked
393 // should have cancelled the timers
394 assertEquals(2, futures.size());
395 verify(futures.poll()).cancel(false);
396 verify(futures.poll()).cancel(false);
403 // new timers should now be active
404 assertEquals(2, futures.size());
405 verify(futures.poll(), never()).cancel(false);
406 verify(futures.poll(), never()).cancel(false);
410 void testSchedule() throws Exception {
411 // must start the scheduler
414 CountDownLatch latch = new CountDownLatch(1);
416 mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
422 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
423 ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
424 ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
426 verify(sched, times(2)).schedule(taskCap.capture(), timeCap.capture(), unitCap.capture());
428 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
429 assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
432 taskCap.getValue().run();
434 assertEquals(0, latch.getCount());
438 void testScheduleWithFixedDelay() throws Exception {
439 // must start the scheduler
442 CountDownLatch latch = new CountDownLatch(1);
444 mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> {
450 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
451 ArgumentCaptor<Long> initCap = ArgumentCaptor.forClass(Long.class);
452 ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
453 ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
455 verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
458 assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
459 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
460 assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
463 taskCap.getValue().run();
465 assertEquals(0, latch.getCount());
469 void testPublishAdmin() throws Exception {
470 Offline msg = new Offline(mgr.getHost());
471 mgr.publishAdmin(msg);
473 assertEquals(Message.ADMIN, msg.getChannel());
475 verify(topicMessageManager).publish(any());
479 void testPublish() throws Exception {
480 Offline msg = new Offline(mgr.getHost());
481 mgr.publish("my.channel", msg);
483 assertEquals("my.channel", msg.getChannel());
485 verify(topicMessageManager).publish(any());
489 void testPublish_InvalidMsg() throws Exception {
490 // message is missing data
491 mgr.publish(Message.ADMIN, new Offline());
493 // should not have attempted to publish it
494 verify(topicMessageManager, never()).publish(any());
498 void testPublish_TopicMessageMngEx() throws Exception {
500 // generate exception
501 doThrow(new PoolingFeatureException()).when(topicMessageManager).publish(any());
503 assertThatCode(() -> mgr.publish(Message.ADMIN, new Offline(mgr.getHost()))).doesNotThrowAnyException();
507 void testOnTopicEvent() throws Exception {
510 StartState st = (StartState) mgr.getCurrent();
513 * give it its heart beat, that should cause it to transition to the Query state.
515 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
516 hb.setChannel(Message.ADMIN);
518 String msg = ser.encodeMsg(hb);
520 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
522 assertInstanceOf(QueryState.class, mgr.getCurrent());
526 void testOnTopicEvent_NullEvent() throws Exception {
529 assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null)).doesNotThrowAnyException();
533 void testBeforeOffer_Unlocked() throws Exception {
536 // route the message to another host
537 mgr.startDistributing(makeAssignments(false));
539 assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
543 void testBeforeOffer_Locked() throws Exception {
547 // route the message to another host
548 mgr.startDistributing(makeAssignments(false));
550 assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
554 void testBeforeInsert() throws Exception {
558 // route the message to this host
559 mgr.startDistributing(makeAssignments(true));
561 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
565 void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
566 validateHandleReqId(null);
570 void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
571 validateHandleReqId("");
575 void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
578 assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
582 void testHandleExternalCommInfrastructureStringStringString() throws Exception {
587 void testHandleExternalForward_NoAssignments() throws Exception {
592 void testHandleExternalForward() throws Exception {
597 void testHandleEvent_NullTarget() throws Exception {
598 // buckets have null targets
599 validateDiscarded(new BucketAssignments(new String[] {null, null}));
603 void testHandleEvent_SameHost() throws Exception {
608 void testHandleEvent_DiffHost() throws Exception {
609 // route the message to the *OTHER* host
610 validateDiscarded(makeAssignments(false));
614 void testDecodeEvent_CannotDecode() throws Exception {
616 mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
618 protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
625 when(controller.isLocked()).thenReturn(true);
627 // create assignments, though they are irrelevant
628 mgr.startDistributing(makeAssignments(false));
630 assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
634 void testDecodeEvent_UnsuppEx() throws Exception {
636 // generate exception
637 mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
639 protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
640 throw new UnsupportedOperationException();
646 when(controller.isLocked()).thenReturn(true);
648 // create assignments, though they are irrelevant
649 mgr.startDistributing(makeAssignments(false));
651 assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
655 void testDecodeEvent_ArgEx() throws Exception {
656 // generate exception
657 mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
659 protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
660 throw new IllegalArgumentException();
666 when(controller.isLocked()).thenReturn(true);
668 // create assignments, though they are irrelevant
669 mgr.startDistributing(makeAssignments(false));
671 assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
675 void testDecodeEvent_StateEx() throws Exception {
676 // generate exception
677 mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
679 protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
680 throw new IllegalStateException();
686 when(controller.isLocked()).thenReturn(true);
688 // create assignments, though they are irrelevant
689 mgr.startDistributing(makeAssignments(false));
691 assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
695 void testDecodeEvent() throws Exception {
698 when(controller.isLocked()).thenReturn(true);
700 // route to another host
701 mgr.startDistributing(makeAssignments(false));
703 assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
707 void testHandleInternal() throws Exception {
710 StartState st = (StartState) mgr.getCurrent();
713 * give it its heart beat, that should cause it to transition to the Query state.
715 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
716 hb.setChannel(Message.ADMIN);
718 String msg = ser.encodeMsg(hb);
720 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
722 assertInstanceOf(QueryState.class, mgr.getCurrent());
726 void testHandleInternal_IoEx() throws Exception {
729 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
731 assertInstanceOf(StartState.class, mgr.getCurrent());
735 void testHandleInternal_PoolEx() throws Exception {
738 StartState st = (StartState) mgr.getCurrent();
740 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
743 * do NOT set the channel - this will cause the message to be invalid, triggering
747 String msg = ser.encodeMsg(hb);
749 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
751 assertInstanceOf(StartState.class, mgr.getCurrent());
755 void testStartDistributing() throws Exception {
759 // null assignments should cause message to be processed locally
760 mgr.startDistributing(null);
761 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
762 verify(topicMessageManager, times(START_PUB)).publish(any());
765 // message for this host
766 mgr.startDistributing(makeAssignments(true));
767 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
770 // message for another host
771 mgr.startDistributing(makeAssignments(false));
772 assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
777 State st = mgr.goStart();
778 assertInstanceOf(StartState.class, st);
779 assertEquals(mgr.getHost(), st.getHost());
784 BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
785 mgr.startDistributing(asgn);
787 State st = mgr.goQuery();
789 assertInstanceOf(QueryState.class, st);
790 assertEquals(mgr.getHost(), st.getHost());
791 assertEquals(asgn, mgr.getAssignments());
795 void testGoActive() {
796 BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
797 mgr.startDistributing(asgn);
799 State st = mgr.goActive();
801 assertInstanceOf(ActiveState.class, st);
802 assertEquals(mgr.getHost(), st.getHost());
803 assertEquals(asgn, mgr.getAssignments());
804 assertEquals(0, active.getCount());
808 void testGoInactive() {
809 State st = mgr.goInactive();
810 assertInstanceOf(InactiveState.class, st);
811 assertEquals(mgr.getHost(), st.getHost());
812 assertEquals(1, active.getCount());
816 void testTimerActionRun() throws Exception {
817 // must start the scheduler
820 CountDownLatch latch = new CountDownLatch(1);
822 mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
828 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
830 verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
833 taskCap.getValue().run();
835 assertEquals(0, latch.getCount());
839 void testTimerActionRun_DiffState() throws Exception {
840 // must start the scheduler
843 CountDownLatch latch = new CountDownLatch(1);
845 mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
851 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
853 verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
855 // give it a heartbeat so that it transitions to the query state
856 StartState st = (StartState) mgr.getCurrent();
857 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
858 hb.setChannel(Message.ADMIN);
860 String msg = ser.encodeMsg(hb);
862 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
864 assertInstanceOf(QueryState.class, mgr.getCurrent());
867 taskCap.getValue().run();
869 // it should NOT have counted down
870 assertEquals(1, latch.getCount());
873 private void validateHandleReqId(String requestId) throws PoolingFeatureException {
876 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
879 private void validateNoForward() throws PoolingFeatureException {
882 // route the message to this host
883 mgr.startDistributing(makeAssignments(true));
885 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
887 verify(topicMessageManager, times(START_PUB)).publish(any());
890 private void validateUnhandled() throws PoolingFeatureException {
892 assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
895 private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
898 // buckets have null targets
899 mgr.startDistributing(bucketAssignments);
901 assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
905 * Makes an assignment with two buckets.
907 * @param sameHost {@code true} if the REQUEST_ID should hash to the
908 * manager's bucket, {@code false} if it should hash to the other host's bucket
909 * @return a new bucket assignment
911 private BucketAssignments makeAssignments(boolean sameHost) {
912 int slot = DECODED_EVENT.hashCode() % 2;
914 // slot numbers are 0 and 1 - reverse them if it's for a different host
919 String[] asgn = new String[2];
920 asgn[slot] = mgr.getHost();
921 asgn[1 - slot] = HOST2;
923 return new BucketAssignments(asgn);
927 * Invokes methods necessary to start the manager.
930 private void startMgr() {
936 * Invokes methods necessary to lock the manager.
938 private void lockMgr() {
940 when(controller.isLocked()).thenReturn(true);
944 * Invokes methods necessary to unlock the manager.
946 private void unlockMgr() {
948 when(controller.isLocked()).thenReturn(false);
952 * Used to create a mock object that implements both super interfaces.
954 private static interface ListeningController extends TopicListener, PolicyController {
959 * Manager with overrides.
961 private class PoolingManagerTest extends PoolingManagerImpl {
963 public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
964 CountDownLatch activeLatch) {
966 super(host, controller, props, activeLatch);
970 protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
972 return topicMessageManager;
976 protected ScheduledThreadPoolExecutor makeScheduler() {
982 protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
983 return (drools2 == drools && TOPIC2.equals(topic2));
987 protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
988 if (drools2 == drools && TOPIC2.equals(topic2) && event == THE_EVENT) {
989 return DECODED_EVENT;