Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / PoolingManagerImplTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.ArgumentMatchers.contains;
30 import static org.mockito.Mockito.doThrow;
31 import static org.mockito.Mockito.mock;
32 import static org.mockito.Mockito.never;
33 import static org.mockito.Mockito.times;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
36
37 import java.util.LinkedList;
38 import java.util.Properties;
39 import java.util.Queue;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ScheduledFuture;
42 import java.util.concurrent.ScheduledThreadPoolExecutor;
43 import java.util.concurrent.TimeUnit;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
48 import org.onap.policy.common.endpoints.event.comm.TopicListener;
49 import org.onap.policy.drools.controller.DroolsController;
50 import org.onap.policy.drools.pooling.message.BucketAssignments;
51 import org.onap.policy.drools.pooling.message.Heartbeat;
52 import org.onap.policy.drools.pooling.message.Message;
53 import org.onap.policy.drools.pooling.message.Offline;
54 import org.onap.policy.drools.pooling.state.ActiveState;
55 import org.onap.policy.drools.pooling.state.IdleState;
56 import org.onap.policy.drools.pooling.state.InactiveState;
57 import org.onap.policy.drools.pooling.state.QueryState;
58 import org.onap.policy.drools.pooling.state.StartState;
59 import org.onap.policy.drools.pooling.state.State;
60 import org.onap.policy.drools.system.PolicyController;
61
62 public class PoolingManagerImplTest {
63
64     protected static final long STD_HEARTBEAT_WAIT_MS = 10;
65     protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
66     protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
67     protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
68     protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
69     protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
70
71     private static final String MY_HOST = "my.host";
72     private static final String HOST2 = "other.host";
73
74     private static final String MY_CONTROLLER = "my.controller";
75     private static final String MY_TOPIC = "my.topic";
76
77     private static final String TOPIC2 = "topic.two";
78
79     private static final String THE_EVENT = "the event";
80
81     private static final Object DECODED_EVENT = new Object();
82
83     /**
84      * Number of dmaap.publish() invocations that should be issued when the manager is
85      * started.
86      */
87     private static final int START_PUB = 1;
88
89     /**
90      * Futures that have been allocated due to calls to scheduleXxx().
91      */
92     private Queue<ScheduledFuture<?>> futures;
93
94     private PoolingProperties poolProps;
95     private ListeningController controller;
96     private DmaapManager dmaap;
97     private boolean gotDmaap;
98     private ScheduledThreadPoolExecutor sched;
99     private int schedCount;
100     private DroolsController drools;
101     private Serializer ser;
102     private CountDownLatch active;
103
104     private PoolingManagerImpl mgr;
105
106     /**
107      * Setup.
108      *
109      * @throws Exception throws exception
110      */
111     @Before
112     public void setUp() throws Exception {
113         Properties plainProps = new Properties();
114
115         poolProps = mock(PoolingProperties.class);
116         when(poolProps.getSource()).thenReturn(plainProps);
117         when(poolProps.getPoolingTopic()).thenReturn(MY_TOPIC);
118         when(poolProps.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
119         when(poolProps.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
120         when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
121         when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
122         when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
123         when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS);
124
125         futures = new LinkedList<>();
126         ser = new Serializer();
127         active = new CountDownLatch(1);
128
129         dmaap = mock(DmaapManager.class);
130         gotDmaap = false;
131         controller = mock(ListeningController.class);
132         sched = mock(ScheduledThreadPoolExecutor.class);
133         schedCount = 0;
134         drools = mock(DroolsController.class);
135
136         when(controller.getName()).thenReturn(MY_CONTROLLER);
137         when(controller.getDrools()).thenReturn(drools);
138         when(controller.isAlive()).thenReturn(true);
139
140         when(sched.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class))).thenAnswer(args -> {
141             ScheduledFuture<?> fut = mock(ScheduledFuture.class);
142             futures.add(fut);
143
144             return fut;
145         });
146
147         when(sched.scheduleWithFixedDelay(any(Runnable.class), any(Long.class), any(Long.class), any(TimeUnit.class)))
148                         .thenAnswer(args -> {
149                             ScheduledFuture<?> fut = mock(ScheduledFuture.class);
150                             futures.add(fut);
151
152                             return fut;
153                         });
154
155         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active);
156     }
157
158     @Test
159     public void testPoolingManagerImpl() throws Exception {
160         assertTrue(gotDmaap);
161
162         State st = mgr.getCurrent();
163         assertTrue(st instanceof IdleState);
164
165         // ensure the state is attached to the manager
166         assertEquals(mgr.getHost(), st.getHost());
167     }
168
169     @Test
170     public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
171         // throw an exception when we try to create the dmaap manager
172         PoolingFeatureException ex = new PoolingFeatureException();
173
174         assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
175             @Override
176             protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
177                 throw ex;
178             }
179         }).isInstanceOf(PoolingFeatureRtException.class).hasCause(ex);
180     }
181
182     @Test
183     public void testGetCurrent() throws Exception {
184         assertEquals(IdleState.class, mgr.getCurrent().getClass());
185
186         startMgr();
187
188         assertEquals(StartState.class, mgr.getCurrent().getClass());
189     }
190
191     @Test
192     public void testGetHost() {
193         assertEquals(MY_HOST, mgr.getHost());
194
195         mgr = new PoolingManagerTest(HOST2, controller, poolProps, active);
196         assertEquals(HOST2, mgr.getHost());
197     }
198
199     @Test
200     public void testGetTopic() {
201         assertEquals(MY_TOPIC, mgr.getTopic());
202     }
203
204     @Test
205     public void testGetProperties() {
206         assertEquals(poolProps, mgr.getProperties());
207     }
208
209     @Test
210     public void testBeforeStart() throws Exception {
211         // not running yet
212         mgr.beforeStart();
213
214         verify(dmaap).startPublisher();
215
216         assertEquals(1, schedCount);
217         verify(sched).setMaximumPoolSize(1);
218         verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
219
220
221         // try again - nothing should happen
222         mgr.beforeStart();
223
224         verify(dmaap).startPublisher();
225
226         assertEquals(1, schedCount);
227         verify(sched).setMaximumPoolSize(1);
228         verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
229     }
230
231     @Test
232     public void testAfterStart() throws Exception {
233         startMgr();
234
235         verify(dmaap).startConsumer(mgr);
236
237         State st = mgr.getCurrent();
238         assertTrue(st instanceof StartState);
239
240         // ensure the state is attached to the manager
241         assertEquals(mgr.getHost(), st.getHost());
242
243         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
244         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
245         verify(sched).schedule(any(Runnable.class), timeCap.capture(), unitCap.capture());
246
247         assertEquals(STD_HEARTBEAT_WAIT_MS, timeCap.getValue().longValue());
248         assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
249
250
251         // already started - nothing else happens
252         mgr.afterStart();
253
254         verify(dmaap).startConsumer(mgr);
255
256         assertTrue(mgr.getCurrent() instanceof StartState);
257
258         verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
259     }
260
261     @Test
262     public void testBeforeStop() throws Exception {
263         startMgr();
264         mgr.startDistributing(makeAssignments(false));
265
266         verify(dmaap, times(START_PUB)).publish(any());
267
268         mgr.beforeStop();
269
270         verify(dmaap).stopConsumer(mgr);
271         verify(sched).shutdownNow();
272         verify(dmaap, times(START_PUB + 1)).publish(any());
273         verify(dmaap).publish(contains("offline"));
274
275         assertTrue(mgr.getCurrent() instanceof IdleState);
276
277         // verify that next message is handled locally
278         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
279         verify(dmaap, times(START_PUB + 1)).publish(any());
280     }
281
282     @Test
283     public void testBeforeStop_NotRunning() throws Exception {
284         final State st = mgr.getCurrent();
285
286         mgr.beforeStop();
287
288         verify(dmaap, never()).stopConsumer(any());
289         verify(sched, never()).shutdownNow();
290
291         // hasn't changed states either
292         assertEquals(st, mgr.getCurrent());
293     }
294
295     @Test
296     public void testBeforeStop_AfterPartialStart() throws Exception {
297         // call beforeStart but not afterStart
298         mgr.beforeStart();
299
300         final State st = mgr.getCurrent();
301
302         mgr.beforeStop();
303
304         // should still shut the scheduler down
305         verify(sched).shutdownNow();
306
307         verify(dmaap, never()).stopConsumer(any());
308
309         // hasn't changed states
310         assertEquals(st, mgr.getCurrent());
311     }
312
313     @Test
314     public void testAfterStop() throws Exception {
315         startMgr();
316         mgr.beforeStop();
317
318         mgr.afterStop();
319
320         verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
321     }
322
323     @Test
324     public void testBeforeLock() throws Exception {
325         startMgr();
326
327         mgr.beforeLock();
328
329         assertTrue(mgr.getCurrent() instanceof IdleState);
330     }
331
332     @Test
333     public void testAfterUnlock_AliveIdle() throws Exception {
334         // this really shouldn't happen
335
336         lockMgr();
337
338         mgr.afterUnlock();
339
340         // stays in idle state, because it has no scheduler
341         assertTrue(mgr.getCurrent() instanceof IdleState);
342     }
343
344     @Test
345     public void testAfterUnlock_AliveStarted() throws Exception {
346         startMgr();
347         lockMgr();
348
349         mgr.afterUnlock();
350
351         assertTrue(mgr.getCurrent() instanceof StartState);
352     }
353
354     @Test
355     public void testAfterUnlock_StoppedIdle() throws Exception {
356         startMgr();
357         lockMgr();
358
359         // controller is stopped
360         when(controller.isAlive()).thenReturn(false);
361
362         mgr.afterUnlock();
363
364         assertTrue(mgr.getCurrent() instanceof IdleState);
365     }
366
367     @Test
368     public void testAfterUnlock_StoppedStarted() throws Exception {
369         startMgr();
370
371         // Note: don't lockMgr()
372
373         // controller is stopped
374         when(controller.isAlive()).thenReturn(false);
375
376         mgr.afterUnlock();
377
378         assertTrue(mgr.getCurrent() instanceof StartState);
379     }
380
381     @Test
382     public void testChangeState() throws Exception {
383         // start should invoke changeState()
384         startMgr();
385
386         /*
387          * now go offline while it's locked
388          */
389         lockMgr();
390
391         // should have cancelled the timers
392         assertEquals(2, futures.size());
393         verify(futures.poll()).cancel(false);
394         verify(futures.poll()).cancel(false);
395
396         /*
397          * now go back online
398          */
399         unlockMgr();
400
401         // new timers should now be active
402         assertEquals(2, futures.size());
403         verify(futures.poll(), never()).cancel(false);
404         verify(futures.poll(), never()).cancel(false);
405     }
406
407     @Test
408     public void testSchedule() throws Exception {
409         // must start the scheduler
410         startMgr();
411
412         CountDownLatch latch = new CountDownLatch(1);
413
414         mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
415             latch.countDown();
416             return null;
417         });
418
419         // capture the task
420         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
421         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
422         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
423
424         verify(sched, times(2)).schedule(taskCap.capture(), timeCap.capture(), unitCap.capture());
425
426         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
427         assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
428
429         // execute it
430         taskCap.getValue().run();
431
432         assertEquals(0, latch.getCount());
433     }
434
435     @Test
436     public void testScheduleWithFixedDelay() throws Exception {
437         // must start the scheduler
438         startMgr();
439
440         CountDownLatch latch = new CountDownLatch(1);
441
442         mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> {
443             latch.countDown();
444             return null;
445         });
446
447         // capture the task
448         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
449         ArgumentCaptor<Long> initCap = ArgumentCaptor.forClass(Long.class);
450         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
451         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
452
453         verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
454                         unitCap.capture());
455
456         assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
457         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
458         assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
459
460         // execute it
461         taskCap.getValue().run();
462
463         assertEquals(0, latch.getCount());
464     }
465
466     @Test
467     public void testPublishAdmin() throws Exception {
468         Offline msg = new Offline(mgr.getHost());
469         mgr.publishAdmin(msg);
470
471         assertEquals(Message.ADMIN, msg.getChannel());
472
473         verify(dmaap).publish(any());
474     }
475
476     @Test
477     public void testPublish() throws Exception {
478         Offline msg = new Offline(mgr.getHost());
479         mgr.publish("my.channel", msg);
480
481         assertEquals("my.channel", msg.getChannel());
482
483         verify(dmaap).publish(any());
484     }
485
486     @Test
487     public void testPublish_InvalidMsg() throws Exception {
488         // message is missing data
489         mgr.publish(Message.ADMIN, new Offline());
490
491         // should not have attempted to publish it
492         verify(dmaap, never()).publish(any());
493     }
494
495     @Test
496     public void testPublish_DmaapEx() throws Exception {
497
498         // generate exception
499         doThrow(new PoolingFeatureException()).when(dmaap).publish(any());
500
501         assertThatCode(() -> mgr.publish(Message.ADMIN, new Offline(mgr.getHost()))).doesNotThrowAnyException();
502     }
503
504     @Test
505     public void testOnTopicEvent() throws Exception {
506         startMgr();
507
508         StartState st = (StartState) mgr.getCurrent();
509
510         /*
511          * give it its heart beat, that should cause it to transition to the Query state.
512          */
513         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
514         hb.setChannel(Message.ADMIN);
515
516         String msg = ser.encodeMsg(hb);
517
518         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
519
520         assertTrue(mgr.getCurrent() instanceof QueryState);
521     }
522
523     @Test
524     public void testOnTopicEvent_NullEvent() throws Exception {
525         startMgr();
526
527         assertThatCode(() -> mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null)).doesNotThrowAnyException();
528     }
529
530     @Test
531     public void testBeforeOffer_Unlocked() throws Exception {
532         startMgr();
533
534         // route the message to another host
535         mgr.startDistributing(makeAssignments(false));
536
537         assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
538     }
539
540     @Test
541     public void testBeforeOffer_Locked() throws Exception {
542         startMgr();
543         lockMgr();
544
545         // route the message to another host
546         mgr.startDistributing(makeAssignments(false));
547
548         assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
549     }
550
551     @Test
552     public void testBeforeInsert() throws Exception {
553         startMgr();
554         lockMgr();
555
556         // route the message to this host
557         mgr.startDistributing(makeAssignments(true));
558
559         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
560     }
561
562     @Test
563     public void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
564         validateHandleReqId(null);
565     }
566
567     @Test
568     public void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
569         validateHandleReqId("");
570     }
571
572     @Test
573     public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
574         startMgr();
575
576         assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
577     }
578
579     @Test
580     public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
581         validateUnhandled();
582     }
583
584     @Test
585     public void testHandleExternalForward_NoAssignments() throws Exception {
586         validateUnhandled();
587     }
588
589     @Test
590     public void testHandleExternalForward() throws Exception {
591         validateNoForward();
592     }
593
594     @Test
595     public void testHandleEvent_NullTarget() throws Exception {
596         // buckets have null targets
597         validateDiscarded(new BucketAssignments(new String[] {null, null}));
598     }
599
600     @Test
601     public void testHandleEvent_SameHost() throws Exception {
602         validateNoForward();
603     }
604
605     @Test
606     public void testHandleEvent_DiffHost() throws Exception {
607         // route the message to the *OTHER* host
608         validateDiscarded(makeAssignments(false));
609     }
610
611     @Test
612     public void testDecodeEvent_CannotDecode() throws Exception {
613
614         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
615             @Override
616             protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
617                 return false;
618             }
619         };
620
621         startMgr();
622
623         when(controller.isLocked()).thenReturn(true);
624
625         // create assignments, though they are irrelevant
626         mgr.startDistributing(makeAssignments(false));
627
628         assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
629     }
630
631     @Test
632     public void testDecodeEvent_UnsuppEx() throws Exception {
633
634         // generate exception
635         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
636             @Override
637             protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
638                 throw new UnsupportedOperationException();
639             }
640         };
641
642         startMgr();
643
644         when(controller.isLocked()).thenReturn(true);
645
646         // create assignments, though they are irrelevant
647         mgr.startDistributing(makeAssignments(false));
648
649         assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
650     }
651
652     @Test
653     public void testDecodeEvent_ArgEx() throws Exception {
654         // generate exception
655         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
656             @Override
657             protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
658                 throw new IllegalArgumentException();
659             }
660         };
661
662         startMgr();
663
664         when(controller.isLocked()).thenReturn(true);
665
666         // create assignments, though they are irrelevant
667         mgr.startDistributing(makeAssignments(false));
668
669         assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
670     }
671
672     @Test
673     public void testDecodeEvent_StateEx() throws Exception {
674         // generate exception
675         mgr = new PoolingManagerTest(MY_HOST, controller, poolProps, active) {
676             @Override
677             protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
678                 throw new IllegalStateException();
679             }
680         };
681
682         startMgr();
683
684         when(controller.isLocked()).thenReturn(true);
685
686         // create assignments, though they are irrelevant
687         mgr.startDistributing(makeAssignments(false));
688
689         assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
690     }
691
692     @Test
693     public void testDecodeEvent() throws Exception {
694         startMgr();
695
696         when(controller.isLocked()).thenReturn(true);
697
698         // route to another host
699         mgr.startDistributing(makeAssignments(false));
700
701         assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
702     }
703
704     @Test
705     public void testHandleInternal() throws Exception {
706         startMgr();
707
708         StartState st = (StartState) mgr.getCurrent();
709
710         /*
711          * give it its heart beat, that should cause it to transition to the Query state.
712          */
713         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
714         hb.setChannel(Message.ADMIN);
715
716         String msg = ser.encodeMsg(hb);
717
718         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
719
720         assertTrue(mgr.getCurrent() instanceof QueryState);
721     }
722
723     @Test
724     public void testHandleInternal_IoEx() throws Exception {
725         startMgr();
726
727         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
728
729         assertTrue(mgr.getCurrent() instanceof StartState);
730     }
731
732     @Test
733     public void testHandleInternal_PoolEx() throws Exception {
734         startMgr();
735
736         StartState st = (StartState) mgr.getCurrent();
737
738         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
739
740         /*
741          * do NOT set the channel - this will cause the message to be invalid, triggering
742          * an exception
743          */
744
745         String msg = ser.encodeMsg(hb);
746
747         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
748
749         assertTrue(mgr.getCurrent() instanceof StartState);
750     }
751
752     @Test
753     public void testStartDistributing() throws Exception {
754         validateNoForward();
755
756
757         // null assignments should cause message to be processed locally
758         mgr.startDistributing(null);
759         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
760         verify(dmaap, times(START_PUB)).publish(any());
761
762
763         // message for this host
764         mgr.startDistributing(makeAssignments(true));
765         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
766
767
768         // message for another host
769         mgr.startDistributing(makeAssignments(false));
770         assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
771     }
772
773     @Test
774     public void testGoStart() {
775         State st = mgr.goStart();
776         assertTrue(st instanceof StartState);
777         assertEquals(mgr.getHost(), st.getHost());
778     }
779
780     @Test
781     public void testGoQuery() {
782         BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
783         mgr.startDistributing(asgn);
784
785         State st = mgr.goQuery();
786
787         assertTrue(st instanceof QueryState);
788         assertEquals(mgr.getHost(), st.getHost());
789         assertEquals(asgn, mgr.getAssignments());
790     }
791
792     @Test
793     public void testGoActive() {
794         BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
795         mgr.startDistributing(asgn);
796
797         State st = mgr.goActive();
798
799         assertTrue(st instanceof ActiveState);
800         assertEquals(mgr.getHost(), st.getHost());
801         assertEquals(asgn, mgr.getAssignments());
802         assertEquals(0, active.getCount());
803     }
804
805     @Test
806     public void testGoInactive() {
807         State st = mgr.goInactive();
808         assertTrue(st instanceof InactiveState);
809         assertEquals(mgr.getHost(), st.getHost());
810         assertEquals(1, active.getCount());
811     }
812
813     @Test
814     public void testTimerActionRun() throws Exception {
815         // must start the scheduler
816         startMgr();
817
818         CountDownLatch latch = new CountDownLatch(1);
819
820         mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
821             latch.countDown();
822             return null;
823         });
824
825         // capture the task
826         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
827
828         verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
829
830         // execute it
831         taskCap.getValue().run();
832
833         assertEquals(0, latch.getCount());
834     }
835
836     @Test
837     public void testTimerActionRun_DiffState() throws Exception {
838         // must start the scheduler
839         startMgr();
840
841         CountDownLatch latch = new CountDownLatch(1);
842
843         mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
844             latch.countDown();
845             return null;
846         });
847
848         // capture the task
849         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
850
851         verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
852
853         // give it a heartbeat so that it transitions to the query state
854         StartState st = (StartState) mgr.getCurrent();
855         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
856         hb.setChannel(Message.ADMIN);
857
858         String msg = ser.encodeMsg(hb);
859
860         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
861
862         assertTrue(mgr.getCurrent() instanceof QueryState);
863
864         // execute it
865         taskCap.getValue().run();
866
867         // it should NOT have counted down
868         assertEquals(1, latch.getCount());
869     }
870
871     private void validateHandleReqId(String requestId) throws PoolingFeatureException {
872         startMgr();
873
874         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
875     }
876
877     private void validateNoForward() throws PoolingFeatureException {
878         startMgr();
879
880         // route the message to this host
881         mgr.startDistributing(makeAssignments(true));
882
883         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
884
885         verify(dmaap, times(START_PUB)).publish(any());
886     }
887
888     private void validateUnhandled() throws PoolingFeatureException {
889         startMgr();
890         assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
891     }
892
893     private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
894         startMgr();
895
896         // buckets have null targets
897         mgr.startDistributing(bucketAssignments);
898
899         assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
900     }
901
902     /**
903      * Makes an assignment with two buckets.
904      *
905      * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the
906      *        manager's bucket, {@code false} if it should hash to the other host's bucket
907      * @return a new bucket assignment
908      */
909     private BucketAssignments makeAssignments(boolean sameHost) {
910         int slot = DECODED_EVENT.hashCode() % 2;
911
912         // slot numbers are 0 and 1 - reverse them if it's for a different host
913         if (!sameHost) {
914             slot = 1 - slot;
915         }
916
917         String[] asgn = new String[2];
918         asgn[slot] = mgr.getHost();
919         asgn[1 - slot] = HOST2;
920
921         return new BucketAssignments(asgn);
922     }
923
924     /**
925      * Invokes methods necessary to start the manager.
926      *
927      * @throws PoolingFeatureException if an error occurs
928      */
929     private void startMgr() throws PoolingFeatureException {
930         mgr.beforeStart();
931         mgr.afterStart();
932     }
933
934     /**
935      * Invokes methods necessary to lock the manager.
936      */
937     private void lockMgr() {
938         mgr.beforeLock();
939         when(controller.isLocked()).thenReturn(true);
940     }
941
942     /**
943      * Invokes methods necessary to unlock the manager.
944      */
945     private void unlockMgr() {
946         mgr.afterUnlock();
947         when(controller.isLocked()).thenReturn(false);
948     }
949
950     /**
951      * Used to create a mock object that implements both super interfaces.
952      */
953     private static interface ListeningController extends TopicListener, PolicyController {
954
955     }
956
957     /**
958      * Manager with overrides.
959      */
960     private class PoolingManagerTest extends PoolingManagerImpl {
961
962         public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
963                         CountDownLatch activeLatch) {
964
965             super(host, controller, props, activeLatch);
966         }
967
968         @Override
969         protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
970             gotDmaap = true;
971             return dmaap;
972         }
973
974         @Override
975         protected ScheduledThreadPoolExecutor makeScheduler() {
976             ++schedCount;
977             return sched;
978         }
979
980         @Override
981         protected boolean canDecodeEvent(DroolsController drools2, String topic2) {
982             return (drools2 == drools && TOPIC2.equals(topic2));
983         }
984
985         @Override
986         protected Object decodeEventWrapper(DroolsController drools2, String topic2, String event) {
987             if (drools2 == drools && TOPIC2.equals(topic2) && event == THE_EVENT) {
988                 return DECODED_EVENT;
989             } else {
990                 return null;
991             }
992         }
993     }
994 }