Merge "Close old UEB/DMaaP consumer"
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / PoolingManagerImplTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.junit.Assert.fail;
29 import static org.mockito.ArgumentMatchers.any;
30 import static org.mockito.ArgumentMatchers.contains;
31 import static org.mockito.Mockito.doAnswer;
32 import static org.mockito.Mockito.doThrow;
33 import static org.mockito.Mockito.mock;
34 import static org.mockito.Mockito.never;
35 import static org.mockito.Mockito.times;
36 import static org.mockito.Mockito.verify;
37 import static org.mockito.Mockito.when;
38 import java.util.LinkedList;
39 import java.util.Properties;
40 import java.util.Queue;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ScheduledFuture;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.onap.policy.drools.controller.DroolsController;
51 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
52 import org.onap.policy.drools.event.comm.TopicListener;
53 import org.onap.policy.drools.pooling.PoolingManagerImpl.Factory;
54 import org.onap.policy.drools.pooling.extractor.ClassExtractors;
55 import org.onap.policy.drools.pooling.message.BucketAssignments;
56 import org.onap.policy.drools.pooling.message.Forward;
57 import org.onap.policy.drools.pooling.message.Heartbeat;
58 import org.onap.policy.drools.pooling.message.Message;
59 import org.onap.policy.drools.pooling.message.Offline;
60 import org.onap.policy.drools.pooling.state.ActiveState;
61 import org.onap.policy.drools.pooling.state.IdleState;
62 import org.onap.policy.drools.pooling.state.InactiveState;
63 import org.onap.policy.drools.pooling.state.QueryState;
64 import org.onap.policy.drools.pooling.state.StartState;
65 import org.onap.policy.drools.pooling.state.State;
66 import org.onap.policy.drools.system.PolicyController;
67
68 public class PoolingManagerImplTest {
69
70     protected static final long STD_HEARTBEAT_WAIT_MS = 10;
71     protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
72     protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
73     protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
74     protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
75     protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
76
77     private static final String MY_HOST = "my.host";
78     private static final String HOST2 = "other.host";
79
80     private static final String MY_CONTROLLER = "my.controller";
81     private static final String MY_TOPIC = "my.topic";
82
83     private static final String TOPIC2 = "topic.two";
84
85     private static final String THE_EVENT = "the event";
86
87     private static final Object DECODED_EVENT = new Object();
88     private static final String REQUEST_ID = "my.request.id";
89
90     /**
91      * Number of dmaap.publish() invocations that should be issued when the manager is
92      * started.
93      */
94     private static final int START_PUB = 1;
95
96     /**
97      * Saved from PoolingManagerImpl and restored on exit from this test class.
98      */
99     private static Factory saveFactory;
100
101     /**
102      * Futures that have been allocated due to calls to scheduleXxx().
103      */
104     private Queue<ScheduledFuture<?>> futures;
105
106     private Properties plainProps;
107     private PoolingProperties poolProps;
108     private ListeningController controller;
109     private EventQueue eventQueue;
110     private ClassExtractors extractors;
111     private DmaapManager dmaap;
112     private ScheduledThreadPoolExecutor sched;
113     private DroolsController drools;
114     private Serializer ser;
115     private Factory factory;
116     private CountDownLatch active;
117
118     private PoolingManagerImpl mgr;
119
120     @BeforeClass
121     public static void setUpBeforeClass() {
122         saveFactory = PoolingManagerImpl.getFactory();
123     }
124
125     @AfterClass
126     public static void tearDownAfterClass() {
127         PoolingManagerImpl.setFactory(saveFactory);
128     }
129
130     @Before
131     public void setUp() throws Exception {
132         plainProps = new Properties();
133
134         poolProps = mock(PoolingProperties.class);
135         when(poolProps.getSource()).thenReturn(plainProps);
136         when(poolProps.getPoolingTopic()).thenReturn(MY_TOPIC);
137         when(poolProps.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
138         when(poolProps.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
139         when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
140         when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
141         when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
142         when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS);
143
144         futures = new LinkedList<>();
145         ser = new Serializer();
146         active = new CountDownLatch(1);
147
148         factory = mock(Factory.class);
149         eventQueue = mock(EventQueue.class);
150         extractors = mock(ClassExtractors.class);
151         dmaap = mock(DmaapManager.class);
152         controller = mock(ListeningController.class);
153         sched = mock(ScheduledThreadPoolExecutor.class);
154         drools = mock(DroolsController.class);
155
156         when(factory.makeEventQueue(any())).thenReturn(eventQueue);
157         when(factory.makeClassExtractors(any())).thenReturn(extractors);
158         when(factory.makeDmaapManager(any(), any())).thenReturn(dmaap);
159         when(factory.makeScheduler()).thenReturn(sched);
160         when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(true);
161         when(factory.decodeEvent(drools, TOPIC2, THE_EVENT)).thenReturn(DECODED_EVENT);
162
163         when(extractors.extract(DECODED_EVENT)).thenReturn(REQUEST_ID);
164
165         when(controller.getName()).thenReturn(MY_CONTROLLER);
166         when(controller.getDrools()).thenReturn(drools);
167         when(controller.isAlive()).thenReturn(true);
168
169         when(sched.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class))).thenAnswer(args -> {
170             ScheduledFuture<?> fut = mock(ScheduledFuture.class);
171             futures.add(fut);
172
173             return fut;
174         });
175
176         when(sched.scheduleWithFixedDelay(any(Runnable.class), any(Long.class), any(Long.class), any(TimeUnit.class)))
177                         .thenAnswer(args -> {
178                             ScheduledFuture<?> fut = mock(ScheduledFuture.class);
179                             futures.add(fut);
180
181                             return fut;
182                         });
183
184         PoolingManagerImpl.setFactory(factory);
185
186         mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps, active);
187     }
188
189     @Test
190     public void testPoolingManagerImpl() throws Exception {
191         verify(factory).makeDmaapManager(any(), any());
192
193         State st = mgr.getCurrent();
194         assertTrue(st instanceof IdleState);
195
196         // ensure the state is attached to the manager
197         assertEquals(mgr.getHost(), st.getHost());
198     }
199
200     @Test
201     public void testPoolingManagerImpl_ClassEx() {
202         /*
203          * this controller does not implement TopicListener, which should cause a
204          * ClassCastException
205          */
206         PolicyController ctlr = mock(PolicyController.class);
207
208         PoolingFeatureRtException ex = expectException(PoolingFeatureRtException.class,
209                         () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps, active));
210         assertNotNull(ex.getCause());
211         assertTrue(ex.getCause() instanceof ClassCastException);
212     }
213
214     @Test
215     public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
216         // throw an exception when we try to create the dmaap manager
217         PoolingFeatureException ex = new PoolingFeatureException();
218         when(factory.makeDmaapManager(any(), any())).thenThrow(ex);
219
220         PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class,
221                         () -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active));
222         assertEquals(ex, ex2.getCause());
223     }
224
225     @Test
226     public void testGetCurrent() throws Exception {
227         assertEquals(IdleState.class, mgr.getCurrent().getClass());
228
229         startMgr();
230
231         assertEquals(StartState.class, mgr.getCurrent().getClass());
232     }
233
234     @Test
235     public void testGetHost() {
236         assertEquals(MY_HOST, mgr.getHost());
237
238         mgr = new PoolingManagerImpl(HOST2, controller, poolProps, active);
239         assertEquals(HOST2, mgr.getHost());
240     }
241
242     @Test
243     public void testGetTopic() {
244         assertEquals(MY_TOPIC, mgr.getTopic());
245     }
246
247     @Test
248     public void testGetProperties() {
249         assertEquals(poolProps, mgr.getProperties());
250     }
251
252     @Test
253     public void testBeforeStart() throws Exception {
254         // not running yet
255         mgr.beforeStart();
256
257         verify(dmaap).startPublisher();
258
259         verify(factory).makeScheduler();
260         verify(sched).setMaximumPoolSize(1);
261         verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
262
263
264         // try again - nothing should happen
265         mgr.beforeStart();
266
267         verify(dmaap).startPublisher();
268
269         verify(factory).makeScheduler();
270         verify(sched).setMaximumPoolSize(1);
271         verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
272     }
273
274     @Test
275     public void testBeforeStart_DmaapEx() throws Exception {
276         // generate an exception
277         PoolingFeatureException ex = new PoolingFeatureException();
278         doThrow(ex).when(dmaap).startPublisher();
279
280         PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, () -> mgr.beforeStart());
281         assertEquals(ex, ex2);
282
283         // should never start the scheduler
284         verify(factory, never()).makeScheduler();
285     }
286
287     @Test
288     public void testAfterStart() throws Exception {
289         startMgr();
290
291         verify(dmaap).startConsumer(mgr);
292
293         State st = mgr.getCurrent();
294         assertTrue(st instanceof StartState);
295
296         // ensure the state is attached to the manager
297         assertEquals(mgr.getHost(), st.getHost());
298
299         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
300         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
301         verify(sched).schedule(any(Runnable.class), timeCap.capture(), unitCap.capture());
302
303         assertEquals(STD_HEARTBEAT_WAIT_MS, timeCap.getValue().longValue());
304         assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
305
306
307         // already started - nothing else happens
308         mgr.afterStart();
309
310         verify(dmaap).startConsumer(mgr);
311
312         assertTrue(mgr.getCurrent() instanceof StartState);
313
314         verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
315     }
316
317     @Test
318     public void testBeforeStop() throws Exception {
319         startMgr();
320
321         mgr.beforeStop();
322
323         verify(dmaap).stopConsumer(mgr);
324         verify(sched).shutdownNow();
325         verify(dmaap).publish(contains("offline"));
326
327         assertTrue(mgr.getCurrent() instanceof IdleState);
328     }
329
330     @Test
331     public void testBeforeStop_NotRunning() throws Exception {
332         State st = mgr.getCurrent();
333
334         mgr.beforeStop();
335
336         verify(dmaap, never()).stopConsumer(any());
337         verify(sched, never()).shutdownNow();
338
339         // hasn't changed states either
340         assertEquals(st, mgr.getCurrent());
341     }
342
343     @Test
344     public void testBeforeStop_AfterPartialStart() throws Exception {
345         // call beforeStart but not afterStart
346         mgr.beforeStart();
347
348         State st = mgr.getCurrent();
349
350         mgr.beforeStop();
351
352         // should still shut the scheduler down
353         verify(sched).shutdownNow();
354
355         verify(dmaap, never()).stopConsumer(any());
356
357         // hasn't changed states
358         assertEquals(st, mgr.getCurrent());
359     }
360
361     @Test
362     public void testAfterStop() throws Exception {
363         startMgr();
364         mgr.beforeStop();
365
366         when(eventQueue.isEmpty()).thenReturn(false);
367         when(eventQueue.size()).thenReturn(3);
368
369         mgr.afterStop();
370
371         verify(eventQueue).clear();
372         verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
373     }
374
375     @Test
376     public void testAfterStop_EmptyQueue() throws Exception {
377         startMgr();
378         mgr.beforeStop();
379
380         when(eventQueue.isEmpty()).thenReturn(true);
381         when(eventQueue.size()).thenReturn(0);
382
383         mgr.afterStop();
384
385         verify(eventQueue, never()).clear();
386         verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
387     }
388
389     @Test
390     public void testBeforeLock() throws Exception {
391         startMgr();
392
393         mgr.beforeLock();
394
395         assertTrue(mgr.getCurrent() instanceof IdleState);
396     }
397
398     @Test
399     public void testAfterUnlock_AliveIdle() throws Exception {
400         // this really shouldn't happen
401
402         lockMgr();
403
404         mgr.afterUnlock();
405
406         // stays in idle state, because it has no scheduler
407         assertTrue(mgr.getCurrent() instanceof IdleState);
408     }
409
410     @Test
411     public void testAfterUnlock_AliveStarted() throws Exception {
412         startMgr();
413         lockMgr();
414
415         mgr.afterUnlock();
416
417         assertTrue(mgr.getCurrent() instanceof StartState);
418     }
419
420     @Test
421     public void testAfterUnlock_StoppedIdle() throws Exception {
422         startMgr();
423         lockMgr();
424
425         // controller is stopped
426         when(controller.isAlive()).thenReturn(false);
427
428         mgr.afterUnlock();
429
430         assertTrue(mgr.getCurrent() instanceof IdleState);
431     }
432
433     @Test
434     public void testAfterUnlock_StoppedStarted() throws Exception {
435         startMgr();
436
437         // Note: don't lockMgr()
438
439         // controller is stopped
440         when(controller.isAlive()).thenReturn(false);
441
442         mgr.afterUnlock();
443
444         assertTrue(mgr.getCurrent() instanceof StartState);
445     }
446
447     @Test
448     public void testChangeState() throws Exception {
449         // start should invoke changeState()
450         startMgr();
451
452         int ntimes = 0;
453
454         // should have set the filter for the StartState
455         verify(dmaap, times(++ntimes)).setFilter(any());
456
457         /*
458          * now go offline while it's locked
459          */
460         lockMgr();
461
462         // should have set the new filter
463         verify(dmaap, times(++ntimes)).setFilter(any());
464
465         // should have cancelled the timers
466         assertEquals(2, futures.size());
467         verify(futures.poll()).cancel(false);
468         verify(futures.poll()).cancel(false);
469
470         /*
471          * now go back online
472          */
473         unlockMgr();
474
475         // should have set the new filter
476         verify(dmaap, times(++ntimes)).setFilter(any());
477
478         // new timers should now be active
479         assertEquals(2, futures.size());
480         verify(futures.poll(), never()).cancel(false);
481         verify(futures.poll(), never()).cancel(false);
482     }
483
484     @Test
485     public void testSetFilter() throws Exception {
486         // start should cause a filter to be set
487         startMgr();
488
489         verify(dmaap).setFilter(any());
490     }
491
492     @Test
493     public void testSetFilter_DmaapEx() throws Exception {
494
495         // generate an exception
496         doThrow(new PoolingFeatureException()).when(dmaap).setFilter(any());
497
498         // start should invoke setFilter()
499         startMgr();
500
501         // no exception, means success
502     }
503
504     @Test
505     public void testInternalTopicFailed() throws Exception {
506         startMgr();
507
508         CountDownLatch latch = mgr.internalTopicFailed();
509
510         // wait for the thread to complete
511         assertTrue(latch.await(2, TimeUnit.SECONDS));
512
513         verify(controller).stop();
514     }
515
516     @Test
517     public void testSchedule() throws Exception {
518         // must start the scheduler
519         startMgr();
520
521         CountDownLatch latch = new CountDownLatch(1);
522
523         mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
524             latch.countDown();
525             return null;
526         });
527
528         // capture the task
529         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
530         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
531         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
532
533         verify(sched, times(2)).schedule(taskCap.capture(), timeCap.capture(), unitCap.capture());
534
535         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
536         assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
537
538         // execute it
539         taskCap.getValue().run();
540
541         assertEquals(0, latch.getCount());
542     }
543
544     @Test
545     public void testScheduleWithFixedDelay() throws Exception {
546         // must start the scheduler
547         startMgr();
548
549         CountDownLatch latch = new CountDownLatch(1);
550
551         mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> {
552             latch.countDown();
553             return null;
554         });
555
556         // capture the task
557         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
558         ArgumentCaptor<Long> initCap = ArgumentCaptor.forClass(Long.class);
559         ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
560         ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
561
562         verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
563                         unitCap.capture());
564
565         assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
566         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
567         assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
568
569         // execute it
570         taskCap.getValue().run();
571
572         assertEquals(0, latch.getCount());
573     }
574
575     @Test
576     public void testPublishAdmin() throws Exception {
577         Offline msg = new Offline(mgr.getHost());
578         mgr.publishAdmin(msg);
579
580         assertEquals(Message.ADMIN, msg.getChannel());
581
582         verify(dmaap).publish(any());
583     }
584
585     @Test
586     public void testPublish() throws Exception {
587         Offline msg = new Offline(mgr.getHost());
588         mgr.publish("my.channel", msg);
589
590         assertEquals("my.channel", msg.getChannel());
591
592         verify(dmaap).publish(any());
593     }
594
595     @Test
596     public void testPublish_InvalidMsg() throws Exception {
597         // message is missing data
598         mgr.publish(Message.ADMIN, new Offline());
599
600         // should not have attempted to publish it
601         verify(dmaap, never()).publish(any());
602     }
603
604     @Test
605     public void testPublish_DmaapEx() throws Exception {
606
607         // generate exception
608         doThrow(new PoolingFeatureException()).when(dmaap).publish(any());
609
610         mgr.publish(Message.ADMIN, new Offline(mgr.getHost()));
611     }
612
613     @Test
614     public void testOnTopicEvent() throws Exception {
615         startMgr();
616
617         StartState st = (StartState) mgr.getCurrent();
618
619         /*
620          * give it its heart beat, that should cause it to transition to the Query state.
621          */
622         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
623         hb.setChannel(Message.ADMIN);
624
625         String msg = ser.encodeMsg(hb);
626
627         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
628
629         assertTrue(mgr.getCurrent() instanceof QueryState);
630     }
631
632     @Test
633     public void testOnTopicEvent_NullEvent() throws Exception {
634         startMgr();
635
636         mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null);
637     }
638
639     @Test
640     public void testBeforeOffer_Unlocked_NoIntercept() throws Exception {
641         startMgr();
642
643         assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
644     }
645
646     @Test
647     public void testBeforeOffer_Locked_NoIntercept() throws Exception {
648         startMgr();
649
650         lockMgr();
651
652         assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
653     }
654
655     @Test
656     public void testBeforeOffer_Locked_Intercept() throws Exception {
657         startMgr();
658         lockMgr();
659
660         // route the message to this host
661         mgr.startDistributing(makeAssignments(true));
662
663         CountDownLatch latch = catchRecursion(false);
664
665         Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
666         mgr.handle(msg);
667
668         verify(dmaap, times(START_PUB)).publish(any());
669         verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
670
671         // ensure we made it past both beforeXxx() methods
672         assertEquals(0, latch.getCount());
673     }
674
675     @Test
676     public void testBeforeInsert_Intercept() throws Exception {
677         startMgr();
678         lockMgr();
679
680         // route the message to this host
681         mgr.startDistributing(makeAssignments(true));
682
683         CountDownLatch latch = catchRecursion(true);
684
685         Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
686         mgr.handle(msg);
687
688         verify(dmaap, times(START_PUB)).publish(any());
689         verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
690
691         // ensure we made it past both beforeXxx() methods
692         assertEquals(0, latch.getCount());
693     }
694
695     @Test
696     public void testBeforeInsert_NoIntercept() throws Exception {
697         startMgr();
698
699         long tbegin = System.currentTimeMillis();
700
701         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
702
703         ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
704         verify(eventQueue).add(msgCap.capture());
705
706         validateMessageContent(tbegin, msgCap.getValue());
707     }
708
709     @Test
710     public void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
711         startMgr();
712
713         when(extractors.extract(any())).thenReturn(null);
714
715         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
716     }
717
718     @Test
719     public void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
720         startMgr();
721
722         when(extractors.extract(any())).thenReturn("");
723
724         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
725     }
726
727     @Test
728     public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
729         startMgr();
730
731         assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
732
733         // should not have tried to enqueue a message
734         verify(eventQueue, never()).add(any());
735     }
736
737     @Test
738     public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
739         startMgr();
740
741         long tbegin = System.currentTimeMillis();
742
743         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
744
745         ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
746         verify(eventQueue).add(msgCap.capture());
747
748         validateMessageContent(tbegin, msgCap.getValue());
749     }
750
751     @Test
752     public void testHandleExternalForward_NoAssignments() throws Exception {
753         startMgr();
754
755         long tbegin = System.currentTimeMillis();
756
757         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
758
759         ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
760         verify(eventQueue).add(msgCap.capture());
761
762         validateMessageContent(tbegin, msgCap.getValue());
763     }
764
765     @Test
766     public void testHandleExternalForward() throws Exception {
767         startMgr();
768
769         // route the message to this host
770         mgr.startDistributing(makeAssignments(true));
771
772         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
773     }
774
775     @Test
776     public void testHandleEvent_NullTarget() throws Exception {
777         startMgr();
778
779         // buckets have null targets
780         mgr.startDistributing(new BucketAssignments(new String[] {null, null}));
781
782         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
783
784         verify(dmaap, times(START_PUB)).publish(any());
785     }
786
787     @Test
788     public void testHandleEvent_SameHost() throws Exception {
789         startMgr();
790
791         // route the message to this host
792         mgr.startDistributing(makeAssignments(true));
793
794         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
795
796         verify(dmaap, times(START_PUB)).publish(any());
797     }
798
799     @Test
800     public void testHandleEvent_DiffHost_TooManyHops() throws Exception {
801         startMgr();
802
803         // route the message to this host
804         mgr.startDistributing(makeAssignments(false));
805
806         Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
807         msg.setNumHops(PoolingManagerImpl.MAX_HOPS + 1);
808         mgr.handle(msg);
809
810         // shouldn't publish
811         verify(dmaap, times(START_PUB)).publish(any());
812         verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
813     }
814
815     @Test
816     public void testHandleEvent_DiffHost_Forward() throws Exception {
817         startMgr();
818
819         // route the message to the *OTHER* host
820         mgr.startDistributing(makeAssignments(false));
821
822         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
823
824         verify(dmaap, times(START_PUB + 1)).publish(any());
825     }
826
827     @Test
828     public void testExtractRequestId_NullEvent() throws Exception {
829         startMgr();
830
831         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, null));
832     }
833
834     @Test
835     public void testExtractRequestId_NullReqId() throws Exception {
836         startMgr();
837
838         when(extractors.extract(any())).thenReturn(null);
839
840         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
841     }
842
843     @Test
844     public void testExtractRequestId() throws Exception {
845         startMgr();
846
847         // route the message to the *OTHER* host
848         mgr.startDistributing(makeAssignments(false));
849
850         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
851     }
852
853     @Test
854     public void testDecodeEvent_CannotDecode() throws Exception {
855         startMgr();
856
857         when(controller.isLocked()).thenReturn(true);
858
859         // create assignments, though they are irrelevant
860         mgr.startDistributing(makeAssignments(false));
861
862         when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(false);
863
864         assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
865     }
866
867     @Test
868     public void testDecodeEvent_UnsuppEx() throws Exception {
869         startMgr();
870
871         when(controller.isLocked()).thenReturn(true);
872
873         // create assignments, though they are irrelevant
874         mgr.startDistributing(makeAssignments(false));
875
876         // generate exception
877         doThrow(new UnsupportedOperationException()).when(factory).decodeEvent(drools, TOPIC2, THE_EVENT);
878
879         assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
880     }
881
882     @Test
883     public void testDecodeEvent_ArgEx() throws Exception {
884         startMgr();
885
886         when(controller.isLocked()).thenReturn(true);
887
888         // create assignments, though they are irrelevant
889         mgr.startDistributing(makeAssignments(false));
890
891         // generate exception
892         doThrow(new IllegalArgumentException()).when(factory).decodeEvent(drools, TOPIC2, THE_EVENT);
893
894         assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
895     }
896
897     @Test
898     public void testDecodeEvent_StateEx() throws Exception {
899         startMgr();
900
901         when(controller.isLocked()).thenReturn(true);
902
903         // create assignments, though they are irrelevant
904         mgr.startDistributing(makeAssignments(false));
905
906         // generate exception
907         doThrow(new IllegalStateException()).when(factory).decodeEvent(drools, TOPIC2, THE_EVENT);
908
909         assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
910     }
911
912     @Test
913     public void testDecodeEvent() throws Exception {
914         startMgr();
915
916         when(controller.isLocked()).thenReturn(true);
917
918         // route to another host
919         mgr.startDistributing(makeAssignments(false));
920
921         assertTrue(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
922     }
923
924     @Test
925     public void testMakeForward() throws Exception {
926         startMgr();
927
928         long tbegin = System.currentTimeMillis();
929
930         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
931
932         ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
933         verify(eventQueue).add(msgCap.capture());
934
935         validateMessageContent(tbegin, msgCap.getValue());
936     }
937
938     @Test
939     public void testMakeForward_InvalidMsg() throws Exception {
940         startMgr();
941
942         assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
943
944         // should not have tried to enqueue a message
945         verify(eventQueue, never()).add(any());
946     }
947
948     @Test
949     public void testHandle_SameHost() throws Exception {
950         startMgr();
951
952         // route the message to this host
953         mgr.startDistributing(makeAssignments(true));
954
955         Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
956         mgr.handle(msg);
957
958         verify(dmaap, times(START_PUB)).publish(any());
959         verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
960     }
961
962     @Test
963     public void testHandle_DiffHost() throws Exception {
964         startMgr();
965
966         // route the message to this host
967         mgr.startDistributing(makeAssignments(false));
968
969         Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
970         mgr.handle(msg);
971
972         verify(dmaap, times(START_PUB + 1)).publish(any());
973         verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
974     }
975
976     @Test
977     public void testInject() throws Exception {
978         startMgr();
979
980         // route the message to this host
981         mgr.startDistributing(makeAssignments(true));
982
983         CountDownLatch latch = catchRecursion(true);
984
985         Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
986         mgr.handle(msg);
987
988         verify(dmaap, times(START_PUB)).publish(any());
989         verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
990
991         // ensure we made it past both beforeXxx() methods
992         assertEquals(0, latch.getCount());
993     }
994
995     @Test
996     public void testInject_Ex() throws Exception {
997         startMgr();
998
999         // route the message to this host
1000         mgr.startDistributing(makeAssignments(true));
1001
1002         // generate RuntimeException when onTopicEvent() is invoked
1003         doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
1004
1005         CountDownLatch latch = catchRecursion(true);
1006
1007         Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
1008         mgr.handle(msg);
1009
1010         verify(dmaap, times(START_PUB)).publish(any());
1011         verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
1012
1013         // ensure we made it past both beforeXxx() methods
1014         assertEquals(0, latch.getCount());
1015     }
1016
1017     @Test
1018     public void testHandleInternal() throws Exception {
1019         startMgr();
1020
1021         StartState st = (StartState) mgr.getCurrent();
1022
1023         /*
1024          * give it its heart beat, that should cause it to transition to the Query state.
1025          */
1026         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
1027         hb.setChannel(Message.ADMIN);
1028
1029         String msg = ser.encodeMsg(hb);
1030
1031         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
1032
1033         assertTrue(mgr.getCurrent() instanceof QueryState);
1034     }
1035
1036     @Test
1037     public void testHandleInternal_IOEx() throws Exception {
1038         startMgr();
1039
1040         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
1041
1042         assertTrue(mgr.getCurrent() instanceof StartState);
1043     }
1044
1045     @Test
1046     public void testHandleInternal_PoolEx() throws Exception {
1047         startMgr();
1048
1049         StartState st = (StartState) mgr.getCurrent();
1050
1051         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
1052
1053         /*
1054          * do NOT set the channel - this will cause the message to be invalid, triggering
1055          * an exception
1056          */
1057
1058         String msg = ser.encodeMsg(hb);
1059
1060         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
1061
1062         assertTrue(mgr.getCurrent() instanceof StartState);
1063     }
1064
1065     @Test
1066     public void testStartDistributing() throws Exception {
1067         startMgr();
1068
1069         // route the message to this host
1070         assertNotNull(mgr.startDistributing(makeAssignments(true)));
1071         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
1072         verify(eventQueue, never()).add(any());
1073
1074
1075         // null assignments should cause message to be queued
1076         assertNull(mgr.startDistributing(null));
1077         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
1078         verify(eventQueue).add(any());
1079
1080
1081         // route the message to this host
1082         assertNotNull(mgr.startDistributing(makeAssignments(true)));
1083         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
1084         verify(eventQueue).add(any());
1085
1086
1087         // route the message to the other host
1088         assertNotNull(mgr.startDistributing(makeAssignments(false)));
1089         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
1090         verify(eventQueue).add(any());
1091     }
1092
1093     @Test
1094     public void testStartDistributing_EventsInQueue_ProcessLocally() throws Exception {
1095         startMgr();
1096
1097         // put items in the queue
1098         LinkedList<Forward> lst = new LinkedList<>();
1099         lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1100         lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1101         lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1102
1103         when(eventQueue.poll()).thenAnswer(args -> lst.poll());
1104
1105         // route the messages to this host
1106         CountDownLatch latch = mgr.startDistributing(makeAssignments(true));
1107         assertTrue(latch.await(2, TimeUnit.SECONDS));
1108
1109         // all of the events should have been processed locally
1110         verify(dmaap, times(START_PUB)).publish(any());
1111         verify(controller, times(3)).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
1112     }
1113
1114     @Test
1115     public void testStartDistributing_EventsInQueue_Forward() throws Exception {
1116         startMgr();
1117
1118         // put items in the queue
1119         LinkedList<Forward> lst = new LinkedList<>();
1120         lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1121         lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1122         lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1123
1124         when(eventQueue.poll()).thenAnswer(args -> lst.poll());
1125
1126         // route the messages to the OTHER host
1127         CountDownLatch latch = mgr.startDistributing(makeAssignments(false));
1128         assertTrue(latch.await(2, TimeUnit.SECONDS));
1129
1130         // all of the events should have been forwarded
1131         verify(dmaap, times(4)).publish(any());
1132         verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
1133     }
1134
1135     @Test
1136     public void testGoStart() {
1137         State st = mgr.goStart();
1138         assertTrue(st instanceof StartState);
1139         assertEquals(mgr.getHost(), st.getHost());
1140     }
1141
1142     @Test
1143     public void testGoQuery() {
1144         BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
1145         mgr.startDistributing(asgn);
1146
1147         State st = mgr.goQuery();
1148
1149         assertTrue(st instanceof QueryState);
1150         assertEquals(mgr.getHost(), st.getHost());
1151         assertEquals(asgn, mgr.getAssignments());
1152     }
1153
1154     @Test
1155     public void testGoActive() {
1156         BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
1157         mgr.startDistributing(asgn);
1158
1159         State st = mgr.goActive();
1160
1161         assertTrue(st instanceof ActiveState);
1162         assertEquals(mgr.getHost(), st.getHost());
1163         assertEquals(asgn, mgr.getAssignments());
1164         assertEquals(0, active.getCount());
1165     }
1166
1167     @Test
1168     public void testGoInactive() {
1169         State st = mgr.goInactive();
1170         assertTrue(st instanceof InactiveState);
1171         assertEquals(mgr.getHost(), st.getHost());
1172         assertEquals(1, active.getCount());
1173     }
1174
1175     @Test
1176     public void testTimerActionRun() throws Exception {
1177         // must start the scheduler
1178         startMgr();
1179
1180         CountDownLatch latch = new CountDownLatch(1);
1181
1182         mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
1183             latch.countDown();
1184             return null;
1185         });
1186
1187         // capture the task
1188         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
1189
1190         verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
1191
1192         // execute it
1193         taskCap.getValue().run();
1194
1195         assertEquals(0, latch.getCount());
1196     }
1197
1198     @Test
1199     public void testTimerActionRun_DiffState() throws Exception {
1200         // must start the scheduler
1201         startMgr();
1202
1203         CountDownLatch latch = new CountDownLatch(1);
1204
1205         mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
1206             latch.countDown();
1207             return null;
1208         });
1209
1210         // capture the task
1211         ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
1212
1213         verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
1214
1215         // give it a heartbeat so that it transitions to the query state
1216         StartState st = (StartState) mgr.getCurrent();
1217         Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
1218         hb.setChannel(Message.ADMIN);
1219
1220         String msg = ser.encodeMsg(hb);
1221
1222         mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
1223
1224         assertTrue(mgr.getCurrent() instanceof QueryState);
1225
1226         // execute it
1227         taskCap.getValue().run();
1228
1229         // it should NOT have counted down
1230         assertEquals(1, latch.getCount());
1231     }
1232
1233     /**
1234      * Validates the message content.
1235      * 
1236      * @param tbegin creation time stamp must be no less than this
1237      * @param msg message to be validated
1238      */
1239     private void validateMessageContent(long tbegin, Forward msg) {
1240         assertEquals(0, msg.getNumHops());
1241         assertTrue(msg.getCreateTimeMs() >= tbegin);
1242         assertEquals(mgr.getHost(), msg.getSource());
1243         assertEquals(CommInfrastructure.UEB, msg.getProtocol());
1244         assertEquals(TOPIC2, msg.getTopic());
1245         assertEquals(THE_EVENT, msg.getPayload());
1246         assertEquals(REQUEST_ID, msg.getRequestId());
1247     }
1248
1249     /**
1250      * Configure the mock controller to act like a real controller, invoking beforeOffer
1251      * and then beforeInsert, so we can make sure they pass through. We'll keep count to
1252      * ensure we don't get into infinite recursion.
1253      * 
1254      * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked,
1255      *        {@code false} if it should be skipped
1256      * 
1257      * @return a latch that will be counted down if both beforeXxx() methods return false
1258      */
1259     private CountDownLatch catchRecursion(boolean invokeBeforeInsert) {
1260         CountDownLatch recursion = new CountDownLatch(3);
1261         CountDownLatch latch = new CountDownLatch(1);
1262
1263         doAnswer(args -> {
1264
1265             recursion.countDown();
1266             if (recursion.getCount() == 0) {
1267                 fail("recursive calls to onTopicEvent");
1268             }
1269
1270             int iarg = 0;
1271             CommInfrastructure proto = args.getArgument(iarg++);
1272             String topic = args.getArgument(iarg++);
1273             String event = args.getArgument(iarg++);
1274
1275             if (mgr.beforeOffer(proto, topic, event)) {
1276                 return null;
1277             }
1278
1279             if (invokeBeforeInsert && mgr.beforeInsert(proto, topic, event, DECODED_EVENT)) {
1280                 return null;
1281             }
1282
1283             latch.countDown();
1284
1285             return null;
1286         }).when(controller).onTopicEvent(any(), any(), any());
1287
1288         return latch;
1289     }
1290
1291     /**
1292      * Makes an assignment with two buckets.
1293      * 
1294      * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the
1295      *        manager's bucket, {@code false} if it should hash to the other host's bucket
1296      * @return a new bucket assignment
1297      */
1298     private BucketAssignments makeAssignments(boolean sameHost) {
1299         int slot = REQUEST_ID.hashCode() % 2;
1300
1301         // slot numbers are 0 and 1 - reverse them if it's for a different host
1302         if (!sameHost) {
1303             slot = 1 - slot;
1304         }
1305
1306         String[] asgn = new String[2];
1307         asgn[slot] = mgr.getHost();
1308         asgn[1 - slot] = HOST2;
1309
1310         return new BucketAssignments(asgn);
1311     }
1312
1313     /**
1314      * Invokes methods necessary to start the manager.
1315      * 
1316      * @throws PoolingFeatureException if an error occurs
1317      */
1318     private void startMgr() throws PoolingFeatureException {
1319         mgr.beforeStart();
1320         mgr.afterStart();
1321     }
1322
1323     /**
1324      * Invokes methods necessary to lock the manager.
1325      */
1326     private void lockMgr() {
1327         mgr.beforeLock();
1328     }
1329
1330     /**
1331      * Invokes methods necessary to unlock the manager.
1332      */
1333     private void unlockMgr() {
1334         mgr.afterUnlock();
1335     }
1336
1337     /**
1338      * Used to create a mock object that implements both super interfaces.
1339      */
1340     private static interface ListeningController extends TopicListener, PolicyController {
1341
1342     }
1343
1344     /**
1345      * Invokes a method that is expected to throw an exception.
1346      * 
1347      * @param exClass class of exception that is expected
1348      * @param func function to invoke
1349      * @return the exception that was thrown
1350      * @throws AssertionError if no exception was thrown
1351      */
1352     private <T extends Exception> T expectException(Class<T> exClass, ExFunction<T> func) {
1353         try {
1354             func.apply();
1355             throw new AssertionError("missing exception");
1356
1357         } catch (Exception e) {
1358             return exClass.cast(e);
1359         }
1360     }
1361
1362     /**
1363      * Function that is expected to throw an exception.
1364      * 
1365      * @param <T> type of exception the function is expected to throw
1366      */
1367     @FunctionalInterface
1368     private static interface ExFunction<T extends Exception> {
1369
1370         /**
1371          * Invokes the function.
1372          * 
1373          * @throws T if an error occurs
1374          */
1375         public void apply() throws T;
1376
1377     }
1378 }