Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-messages / src / test / java / org / onap / policy / drools / pooling / PoolingManagerImplTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertFalse;
28 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
30 import static org.mockito.ArgumentMatchers.any;
31 import static org.mockito.ArgumentMatchers.contains;
32 import static org.mockito.Mockito.doThrow;
33 import static org.mockito.Mockito.mock;
34 import static org.mockito.Mockito.never;
35 import static org.mockito.Mockito.times;
36 import static org.mockito.Mockito.verify;
37 import static org.mockito.Mockito.when;
38
39 import java.util.LinkedList;
40 import java.util.Properties;
41 import java.util.Queue;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.ScheduledFuture;
44 import java.util.concurrent.ScheduledThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import org.junit.jupiter.api.BeforeEach;
47 import org.junit.jupiter.api.Test;
48 import org.mockito.ArgumentCaptor;
49 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
50 import org.onap.policy.common.endpoints.event.comm.TopicListener;
51 import org.onap.policy.drools.controller.DroolsController;
52 import org.onap.policy.drools.pooling.message.BucketAssignments;
53 import org.onap.policy.drools.pooling.message.Heartbeat;
54 import org.onap.policy.drools.pooling.message.Message;
55 import org.onap.policy.drools.pooling.message.Offline;
56 import org.onap.policy.drools.pooling.state.ActiveState;
57 import org.onap.policy.drools.pooling.state.IdleState;
58 import org.onap.policy.drools.pooling.state.InactiveState;
59 import org.onap.policy.drools.pooling.state.QueryState;
60 import org.onap.policy.drools.pooling.state.StartState;
61 import org.onap.policy.drools.pooling.state.State;
62 import org.onap.policy.drools.system.PolicyController;
63
64 class PoolingManagerImplTest {
65
66     protected static final long STD_HEARTBEAT_WAIT_MS = 10;
67     protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
68     protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
69     protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
70     protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
71     protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
72
73     private static final String MY_HOST = "my.host";
74     private static final String HOST2 = "other.host";
75
76     private static final String MY_CONTROLLER = "my.controller";
77     private static final String MY_TOPIC = "my.topic";
78
79     private static final String TOPIC2 = "topic.two";
80
81     private static final String THE_EVENT = "the event";
82
83     private static final Object DECODED_EVENT = new Object();
84
85     /**
86      * Number of publish() invocations that should be issued when the manager is
87      * started.
88      */
89     private static final int START_PUB = 1;
90
91     /**
92      * Futures that have been allocated due to calls to scheduleXxx().
93      */
94     private Queue<ScheduledFuture<?>> futures;
95
96     private PoolingProperties poolProps;
97     private ListeningController controller;
98     private TopicMessageManager topicMessageManager;
99     private boolean gotManager;
100     private ScheduledThreadPoolExecutor sched;
101     private int schedCount;
102     private DroolsController drools;
103     private Serializer ser;
104     private CountDownLatch active;
105
106     private PoolingManagerImpl mgr;
107
108     /**
109      * Setup.
110      *
111      * @throws Exception throws exception
112      */
113     @BeforeEach
114     public void setUp() throws Exception {
115         Properties plainProps = new Properties();
116
117         poolProps = mock(PoolingProperties.class);
118         when(poolProps.getSource()).thenReturn(plainProps);
119         when(poolProps.getPoolingTopic()).thenReturn(MY_TOPIC);
120         when(poolProps.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
121         when(poolProps.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
122         when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
123         when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
124         when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
125         when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS);
126
127         futures = new LinkedList<>();
128         ser = new Serializer();
129         active = new CountDownLatch(1);
130
131         topicMessageManager = mock(TopicMessageManager.class);
132         gotManager = false;
133         controller = mock(ListeningController.class);
134         sched = mock(ScheduledThreadPoolExecutor.class);
135         schedCount = 0;
136         drools = mock(DroolsController.class);
137
138         when(controller.getName()).thenReturn(MY_CONTROLLER);
139         when(controller.getDrools()).thenReturn(drools);
140         when(controller.isAlive()).thenReturn(true);
141
142         when(sched.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class))).thenAnswer(args -> {
143             ScheduledFuture<?> fut = mock(ScheduledFuture.class);
144             futures.add(fut);
145
146             return fut;
147         });
148
149         when(sched.scheduleWithFixedDelay(any(Runnable.class), any(Long.class), any(Long.class), any(TimeUnit.class)))
150                         .thenAnswer(args -> {
151                             ScheduledFuture<?> fut = mock(ScheduledFuture.class);
152                             futures.add(fut);
153
154                             return fut;
155                         });
156
157         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active);
158     }
159
160     @Test
161     void testPoolingManagerImpl() {
162         assertTrue(gotManager);
163
164         State st = mgr.getCurrent();
165         assertInstanceOf(IdleState.class, st);
166
167         // ensure the state is attached to the manager
168         assertEquals(mgr.getHost(), st.getHost());
169     }
170
171     @Test
172     void testPoolingManagerImpl_PoolEx() {
173         // throw an exception when we try to create the topic messages manager
174         PoolingFeatureException ex = new PoolingFeatureException();
175
176         assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
177             @Override
178             protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
179                 throw ex;
180             }
181         }).isInstanceOf(PoolingFeatureRtException.class).hasCause(ex);
182     }
183
184     @Test
185     void testGetCurrent() throws Exception {
186         assertEquals(IdleState.class, mgr.getCurrent().getClass());
187
188         startMgr();
189
190         assertEquals(StartState.class, mgr.getCurrent().getClass());
191     }
192
193     @Test
194     void testGetHost() {
195         assertEquals(MY_HOST, mgr.getHost());
196
197         mgr = new PoolingManagerTest(HOST2, controller, poolProps, active);
198         assertEquals(HOST2, mgr.getHost());
199     }
200
201     @Test
202     void testGetTopic() {
203         assertEquals(MY_TOPIC, mgr.getTopic());
204     }
205
206     @Test
207     void testGetProperties() {
208         assertEquals(poolProps, mgr.getProperties());
209     }
210
211     @Test
212     void testBeforeStart() {
213         // not running yet
214         mgr.beforeStart();
215
216         verify(topicMessageManager).startPublisher();
217
218         assertEquals(1, schedCount);
219         verify(sched).setMaximumPoolSize(1);
220         verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
221
222
223         // try again - nothing should happen
224         mgr.beforeStart();
225
226         verify(topicMessageManager).startPublisher();
227
228         assertEquals(1, schedCount);
229         verify(sched).setMaximumPoolSize(1);
230         verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
231     }
232
233     @Test
234     void testAfterStart() throws Exception {
235         startMgr();
236
237         verify(topicMessageManager).startConsumer(mgr);
238
239         State st = mgr.getCurrent();
240         assertInstanceOf(StartState.class, st);
241
242         // ensure the state is attached to the manager
243         assertEquals(mgr.getHost(), st.getHost());
244
245         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
246         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
247         verify(sched).schedule(any(Runnable.class), timeCap.capture(), unitCap.capture());
248
249         assertEquals(STD_HEARTBEAT_WAIT_MS, timeCap.getValue().longValue());
250         assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
251
252
253         // already started - nothing else happens
254         mgr.afterStart();
255
256         verify(topicMessageManager).startConsumer(mgr);
257
258         assertInstanceOf(StartState.class, mgr.getCurrent());
259
260         verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
261     }
262
263     @Test
264     void testBeforeStop() throws Exception {
265         startMgr();
266         mgr.startDistributing(makeAssignments(false));
267
268         verify(topicMessageManager, times(START_PUB)).publish(any());
269
270         mgr.beforeStop();
271
272         verify(topicMessageManager).stopConsumer(mgr);
273         verify(sched).shutdownNow();
274         verify(topicMessageManager, times(START_PUB + 1)).publish(any());
275         verify(topicMessageManager).publish(contains("offline"));
276
277         assertInstanceOf(IdleState.class, mgr.getCurrent());
278
279         // verify that next message is handled locally
280         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
281         verify(topicMessageManager, times(START_PUB + 1)).publish(any());
282     }
283
284     @Test
285     void testBeforeStop_NotRunning() {
286         final State st = mgr.getCurrent();
287
288         mgr.beforeStop();
289
290         verify(topicMessageManager, never()).stopConsumer(any());
291         verify(sched, never()).shutdownNow();
292
293         // hasn't changed states either
294         assertEquals(st, mgr.getCurrent());
295     }
296
297     @Test
298     void testBeforeStop_AfterPartialStart() {
299         // call beforeStart but not afterStart
300         mgr.beforeStart();
301
302         final State st = mgr.getCurrent();
303
304         mgr.beforeStop();
305
306         // should still shut the scheduler down
307         verify(sched).shutdownNow();
308
309         verify(topicMessageManager, never()).stopConsumer(any());
310
311         // hasn't changed states
312         assertEquals(st, mgr.getCurrent());
313     }
314
315     @Test
316     void testAfterStop() throws Exception {
317         startMgr();
318         mgr.beforeStop();
319
320         mgr.afterStop();
321
322         verify(topicMessageManager).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
323     }
324
325     @Test
326     void testBeforeLock() throws Exception {
327         startMgr();
328
329         mgr.beforeLock();
330
331         assertInstanceOf(IdleState.class, mgr.getCurrent());
332     }
333
334     @Test
335     void testAfterUnlock_AliveIdle() {
336         // this really shouldn't happen
337
338         lockMgr();
339
340         mgr.afterUnlock();
341
342         // stays in idle state, because it has no scheduler
343         assertInstanceOf(IdleState.class, mgr.getCurrent());
344     }
345
346     @Test
347     void testAfterUnlock_AliveStarted() throws Exception {
348         startMgr();
349         lockMgr();
350
351         mgr.afterUnlock();
352
353         assertInstanceOf(StartState.class, mgr.getCurrent());
354     }
355
356     @Test
357     void testAfterUnlock_StoppedIdle() throws Exception {
358         startMgr();
359         lockMgr();
360
361         // controller is stopped
362         when(controller.isAlive()).thenReturn(false);
363
364         mgr.afterUnlock();
365
366         assertInstanceOf(IdleState.class, mgr.getCurrent());
367     }
368
369     @Test
370     void testAfterUnlock_StoppedStarted() throws Exception {
371         startMgr();
372
373         // Note: don't lockMgr()
374
375         // controller is stopped
376         when(controller.isAlive()).thenReturn(false);
377
378         mgr.afterUnlock();
379
380         assertInstanceOf(StartState.class, mgr.getCurrent());
381     }
382
383     @Test
384     void testChangeState() throws Exception {
385         // start should invoke changeState()
386         startMgr();
387
388         /*
389          * now go offline while it's locked
390          */
391         lockMgr();
392
393         // should have cancelled the timers
394         assertEquals(2, futures.size());
395         verify(futures.poll()).cancel(false);
396         verify(futures.poll()).cancel(false);
397
398         /*
399          * now go back online
400          */
401         unlockMgr();
402
403         // new timers should now be active
404         assertEquals(2, futures.size());
405         verify(futures.poll(), never()).cancel(false);
406         verify(futures.poll(), never()).cancel(false);
407     }
408
409     @Test
410     void testSchedule() throws Exception {
411         // must start the scheduler
412         startMgr();
413
414         CountDownLatch latch = new CountDownLatch(1);
415
416         mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
417             latch.countDown();
418             return null;
419         });
420
421         // capture the task
422         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
423         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
424         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
425
426         verify(sched, times(2)).schedule(taskCap.capture(), timeCap.capture(), unitCap.capture());
427
428         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
429         assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
430
431         // execute it
432         taskCap.getValue().run();
433
434         assertEquals(0, latch.getCount());
435     }
436
437     @Test
438     void testScheduleWithFixedDelay() throws Exception {
439         // must start the scheduler
440         startMgr();
441
442         CountDownLatch latch = new CountDownLatch(1);
443
444         mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> {
445             latch.countDown();
446             return null;
447         });
448
449         // capture the task
450         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
451         ArgumentCaptor<Long> initCap = ArgumentCaptor.forClass(Long.class);
452         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
453         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
454
455         verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
456                         unitCap.capture());
457
458         assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
459         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
460         assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
461
462         // execute it
463         taskCap.getValue().run();
464
465         assertEquals(0, latch.getCount());
466     }
467
468     @Test
469     void testPublishAdmin() throws Exception {
470         Offline msg = new Offline(mgr.getHost());
471         mgr.publishAdmin(msg);
472
473         assertEquals(Message.ADMIN, msg.getChannel());
474
475         verify(topicMessageManager).publish(any());
476     }
477
478     @Test
479     void testPublish() throws Exception {
480         Offline msg = new Offline(mgr.getHost());
481         mgr.publish("my.channel", msg);
482
483         assertEquals("my.channel", msg.getChannel());
484
485         verify(topicMessageManager).publish(any());
486     }
487
488     @Test
489     void testPublish_InvalidMsg() throws Exception {
490         // message is missing data
491         mgr.publish(Message.ADMIN, new Offline());
492
493         // should not have attempted to publish it
494         verify(topicMessageManager, never()).publish(any());
495     }
496
497     @Test
498     void testPublish_TopicMessageMngEx() throws Exception {
499
500         // generate exception
501         doThrow(new PoolingFeatureException()).when(topicMessageManager).publish(any());
502
503         assertThatCode(() -> mgr.publish(Message.ADMIN, new Offline(mgr.getHost()))).doesNotThrowAnyException();
504     }
505
506     @Test
507     void testOnTopicEvent() throws Exception {
508         startMgr();
509
510         StartState st = (StartState) mgr.getCurrent();
511
512         /*
513          * give it its heart beat, that should cause it to transition to the Query state.
514          */
515         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
516         hb.setChannel(Message.ADMIN);
517
518         String msg = ser.encodeMsg(hb);
519
520         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
521
522         assertInstanceOf(QueryState.class, mgr.getCurrent());
523     }
524
525     @Test
526     void testOnTopicEvent_NullEvent() throws Exception {
527         startMgr();
528
529         assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null)).doesNotThrowAnyException();
530     }
531
532     @Test
533     void testBeforeOffer_Unlocked() throws Exception {
534         startMgr();
535
536         // route the message to another host
537         mgr.startDistributing(makeAssignments(false));
538
539         assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
540     }
541
542     @Test
543     void testBeforeOffer_Locked() throws Exception {
544         startMgr();
545         lockMgr();
546
547         // route the message to another host
548         mgr.startDistributing(makeAssignments(false));
549
550         assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
551     }
552
553     @Test
554     void testBeforeInsert() throws Exception {
555         startMgr();
556         lockMgr();
557
558         // route the message to this host
559         mgr.startDistributing(makeAssignments(true));
560
561         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
562     }
563
564     @Test
565     void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
566         validateHandleReqId(null);
567     }
568
569     @Test
570     void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
571         validateHandleReqId("");
572     }
573
574     @Test
575     void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
576         startMgr();
577
578         assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
579     }
580
581     @Test
582     void testHandleExternalCommInfrastructureStringStringString() throws Exception {
583         validateUnhandled();
584     }
585
586     @Test
587     void testHandleExternalForward_NoAssignments() throws Exception {
588         validateUnhandled();
589     }
590
591     @Test
592     void testHandleExternalForward() throws Exception {
593         validateNoForward();
594     }
595
596     @Test
597     void testHandleEvent_NullTarget() throws Exception {
598         // buckets have null targets
599         validateDiscarded(new BucketAssignments(new String[] {null, null}));
600     }
601
602     @Test
603     void testHandleEvent_SameHost() throws Exception {
604         validateNoForward();
605     }
606
607     @Test
608     void testHandleEvent_DiffHost() throws Exception {
609         // route the message to the *OTHER* host
610         validateDiscarded(makeAssignments(false));
611     }
612
613     @Test
614     void testDecodeEvent_CannotDecode() throws Exception {
615
616         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
617             @Override
618             protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
619                 return false;
620             }
621         };
622
623         startMgr();
624
625         when(controller.isLocked()).thenReturn(true);
626
627         // create assignments, though they are irrelevant
628         mgr.startDistributing(makeAssignments(false));
629
630         assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
631     }
632
633     @Test
634     void testDecodeEvent_UnsuppEx() throws Exception {
635
636         // generate exception
637         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
638             @Override
639             protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
640                 throw new UnsupportedOperationException();
641             }
642         };
643
644         startMgr();
645
646         when(controller.isLocked()).thenReturn(true);
647
648         // create assignments, though they are irrelevant
649         mgr.startDistributing(makeAssignments(false));
650
651         assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
652     }
653
654     @Test
655     void testDecodeEvent_ArgEx() throws Exception {
656         // generate exception
657         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
658             @Override
659             protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
660                 throw new IllegalArgumentException();
661             }
662         };
663
664         startMgr();
665
666         when(controller.isLocked()).thenReturn(true);
667
668         // create assignments, though they are irrelevant
669         mgr.startDistributing(makeAssignments(false));
670
671         assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
672     }
673
674     @Test
675     void testDecodeEvent_StateEx() throws Exception {
676         // generate exception
677         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
678             @Override
679             protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
680                 throw new IllegalStateException();
681             }
682         };
683
684         startMgr();
685
686         when(controller.isLocked()).thenReturn(true);
687
688         // create assignments, though they are irrelevant
689         mgr.startDistributing(makeAssignments(false));
690
691         assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
692     }
693
694     @Test
695     void testDecodeEvent() throws Exception {
696         startMgr();
697
698         when(controller.isLocked()).thenReturn(true);
699
700         // route to another host
701         mgr.startDistributing(makeAssignments(false));
702
703         assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
704     }
705
706     @Test
707     void testHandleInternal() throws Exception {
708         startMgr();
709
710         StartState st = (StartState) mgr.getCurrent();
711
712         /*
713          * give it its heart beat, that should cause it to transition to the Query state.
714          */
715         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
716         hb.setChannel(Message.ADMIN);
717
718         String msg = ser.encodeMsg(hb);
719
720         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
721
722         assertInstanceOf(QueryState.class, mgr.getCurrent());
723     }
724
725     @Test
726     void testHandleInternal_IoEx() throws Exception {
727         startMgr();
728
729         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
730
731         assertInstanceOf(StartState.class, mgr.getCurrent());
732     }
733
734     @Test
735     void testHandleInternal_PoolEx() throws Exception {
736         startMgr();
737
738         StartState st = (StartState) mgr.getCurrent();
739
740         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
741
742         /*
743          * do NOT set the channel - this will cause the message to be invalid, triggering
744          * an exception
745          */
746
747         String msg = ser.encodeMsg(hb);
748
749         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
750
751         assertInstanceOf(StartState.class, mgr.getCurrent());
752     }
753
754     @Test
755     void testStartDistributing() throws Exception {
756         validateNoForward();
757
758
759         // null assignments should cause message to be processed locally
760         mgr.startDistributing(null);
761         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
762         verify(topicMessageManager, times(START_PUB)).publish(any());
763
764
765         // message for this host
766         mgr.startDistributing(makeAssignments(true));
767         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
768
769
770         // message for another host
771         mgr.startDistributing(makeAssignments(false));
772         assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
773     }
774
775     @Test
776     void testGoStart() {
777         State st = mgr.goStart();
778         assertInstanceOf(StartState.class, st);
779         assertEquals(mgr.getHost(), st.getHost());
780     }
781
782     @Test
783     void testGoQuery() {
784         BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
785         mgr.startDistributing(asgn);
786
787         State st = mgr.goQuery();
788
789         assertInstanceOf(QueryState.class, st);
790         assertEquals(mgr.getHost(), st.getHost());
791         assertEquals(asgn, mgr.getAssignments());
792     }
793
794     @Test
795     void testGoActive() {
796         BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
797         mgr.startDistributing(asgn);
798
799         State st = mgr.goActive();
800
801         assertInstanceOf(ActiveState.class, st);
802         assertEquals(mgr.getHost(), st.getHost());
803         assertEquals(asgn, mgr.getAssignments());
804         assertEquals(0, active.getCount());
805     }
806
807     @Test
808     void testGoInactive() {
809         State st = mgr.goInactive();
810         assertInstanceOf(InactiveState.class, st);
811         assertEquals(mgr.getHost(), st.getHost());
812         assertEquals(1, active.getCount());
813     }
814
815     @Test
816     void testTimerActionRun() throws Exception {
817         // must start the scheduler
818         startMgr();
819
820         CountDownLatch latch = new CountDownLatch(1);
821
822         mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
823             latch.countDown();
824             return null;
825         });
826
827         // capture the task
828         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
829
830         verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
831
832         // execute it
833         taskCap.getValue().run();
834
835         assertEquals(0, latch.getCount());
836     }
837
838     @Test
839     void testTimerActionRun_DiffState() throws Exception {
840         // must start the scheduler
841         startMgr();
842
843         CountDownLatch latch = new CountDownLatch(1);
844
845         mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
846             latch.countDown();
847             return null;
848         });
849
850         // capture the task
851         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
852
853         verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
854
855         // give it a heartbeat so that it transitions to the query state
856         StartState st = (StartState) mgr.getCurrent();
857         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
858         hb.setChannel(Message.ADMIN);
859
860         String msg = ser.encodeMsg(hb);
861
862         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
863
864         assertInstanceOf(QueryState.class, mgr.getCurrent());
865
866         // execute it
867         taskCap.getValue().run();
868
869         // it should NOT have counted down
870         assertEquals(1, latch.getCount());
871     }
872
873     private void validateHandleReqId(String requestId) throws PoolingFeatureException {
874         startMgr();
875
876         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
877     }
878
879     private void validateNoForward() throws PoolingFeatureException {
880         startMgr();
881
882         // route the message to this host
883         mgr.startDistributing(makeAssignments(true));
884
885         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
886
887         verify(topicMessageManager, times(START_PUB)).publish(any());
888     }
889
890     private void validateUnhandled() throws PoolingFeatureException {
891         startMgr();
892         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
893     }
894
895     private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
896         startMgr();
897
898         // buckets have null targets
899         mgr.startDistributing(bucketAssignments);
900
901         assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
902     }
903
904     /**
905      * Makes an assignment with two buckets.
906      *
907      * @param sameHost {@code true} if the REQUEST_ID should hash to the
908      *        manager's bucket, {@code false} if it should hash to the other host's bucket
909      * @return a new bucket assignment
910      */
911     private BucketAssignments makeAssignments(boolean sameHost) {
912         int slot = DECODED_EVENT.hashCode() % 2;
913
914         // slot numbers are 0 and 1 - reverse them if it's for a different host
915         if (!sameHost) {
916             slot = 1 - slot;
917         }
918
919         String[] asgn = new String[2];
920         asgn[slot] = mgr.getHost();
921         asgn[1 - slot] = HOST2;
922
923         return new BucketAssignments(asgn);
924     }
925
926     /**
927      * Invokes methods necessary to start the manager.
928      *
929      */
930     private void startMgr() {
931         mgr.beforeStart();
932         mgr.afterStart();
933     }
934
935     /**
936      * Invokes methods necessary to lock the manager.
937      */
938     private void lockMgr() {
939         mgr.beforeLock();
940         when(controller.isLocked()).thenReturn(true);
941     }
942
943     /**
944      * Invokes methods necessary to unlock the manager.
945      */
946     private void unlockMgr() {
947         mgr.afterUnlock();
948         when(controller.isLocked()).thenReturn(false);
949     }
950
951     /**
952      * Used to create a mock object that implements both super interfaces.
953      */
954     private static interface ListeningController extends TopicListener, PolicyController {
955
956     }
957
958     /**
959      * Manager with overrides.
960      */
961     private class PoolingManagerTest extends PoolingManagerImpl {
962
963         public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
964                         CountDownLatch activeLatch) {
965
966             super(host, controller, props, activeLatch);
967         }
968
969         @Override
970         protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
971             gotManager = true;
972             return topicMessageManager;
973         }
974
975         @Override
976         protected ScheduledThreadPoolExecutor makeScheduler() {
977             ++schedCount;
978             return sched;
979         }
980
981         @Override
982         protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
983             return (drools2 == drools && TOPIC2.equals(topic2));
984         }
985
986         @Override
987         protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
988             if (drools2 == drools && TOPIC2.equals(topic2) && event == THE_EVENT) {
989                 return DECODED_EVENT;
990             } else {
991                 return null;
992             }
993         }
994     }
995 }