2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.junit.Assert.fail;
29 import static org.mockito.ArgumentMatchers.any;
30 import static org.mockito.ArgumentMatchers.contains;
31 import static org.mockito.Mockito.doAnswer;
32 import static org.mockito.Mockito.doThrow;
33 import static org.mockito.Mockito.mock;
34 import static org.mockito.Mockito.never;
35 import static org.mockito.Mockito.times;
36 import static org.mockito.Mockito.verify;
37 import static org.mockito.Mockito.when;
38 import java.util.LinkedList;
39 import java.util.Properties;
40 import java.util.Queue;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ScheduledFuture;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.onap.policy.drools.controller.DroolsController;
51 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
52 import org.onap.policy.drools.event.comm.TopicListener;
53 import org.onap.policy.drools.pooling.PoolingManagerImpl.Factory;
54 import org.onap.policy.drools.pooling.extractor.ClassExtractors;
55 import org.onap.policy.drools.pooling.message.BucketAssignments;
56 import org.onap.policy.drools.pooling.message.Forward;
57 import org.onap.policy.drools.pooling.message.Heartbeat;
58 import org.onap.policy.drools.pooling.message.Message;
59 import org.onap.policy.drools.pooling.message.Offline;
60 import org.onap.policy.drools.pooling.state.ActiveState;
61 import org.onap.policy.drools.pooling.state.IdleState;
62 import org.onap.policy.drools.pooling.state.InactiveState;
63 import org.onap.policy.drools.pooling.state.QueryState;
64 import org.onap.policy.drools.pooling.state.StartState;
65 import org.onap.policy.drools.pooling.state.State;
66 import org.onap.policy.drools.system.PolicyController;
68 public class PoolingManagerImplTest {
70 protected static final long STD_HEARTBEAT_WAIT_MS = 10;
71 protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
72 protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
73 protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
74 protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
75 protected static final long STD_OFFLINE_PUB_WAIT_MS = STD_INTER_HEARTBEAT_MS + 1;
77 private static final String MY_HOST = "my.host";
78 private static final String HOST2 = "other.host";
80 private static final String MY_CONTROLLER = "my.controller";
81 private static final String MY_TOPIC = "my.topic";
83 private static final String TOPIC2 = "topic.two";
85 private static final String THE_EVENT = "the event";
87 private static final Object DECODED_EVENT = new Object();
88 private static final String REQUEST_ID = "my.request.id";
91 * Number of dmaap.publish() invocations that should be issued when the manager is
94 private static final int START_PUB = 1;
97 * Saved from PoolingManagerImpl and restored on exit from this test class.
99 private static Factory saveFactory;
102 * Futures that have been allocated due to calls to scheduleXxx().
104 private Queue<ScheduledFuture<?>> futures;
106 private Properties plainProps;
107 private PoolingProperties poolProps;
108 private ListeningController controller;
109 private EventQueue eventQueue;
110 private ClassExtractors extractors;
111 private DmaapManager dmaap;
112 private ScheduledThreadPoolExecutor sched;
113 private DroolsController drools;
114 private Serializer ser;
115 private Factory factory;
116 private CountDownLatch active;
118 private PoolingManagerImpl mgr;
121 public static void setUpBeforeClass() {
122 saveFactory = PoolingManagerImpl.getFactory();
126 public static void tearDownAfterClass() {
127 PoolingManagerImpl.setFactory(saveFactory);
131 public void setUp() throws Exception {
132 plainProps = new Properties();
134 poolProps = mock(PoolingProperties.class);
135 when(poolProps.getSource()).thenReturn(plainProps);
136 when(poolProps.getPoolingTopic()).thenReturn(MY_TOPIC);
137 when(poolProps.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
138 when(poolProps.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
139 when(poolProps.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
140 when(poolProps.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
141 when(poolProps.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
142 when(poolProps.getOfflinePubWaitMs()).thenReturn(STD_OFFLINE_PUB_WAIT_MS);
144 futures = new LinkedList<>();
145 ser = new Serializer();
146 active = new CountDownLatch(1);
148 factory = mock(Factory.class);
149 eventQueue = mock(EventQueue.class);
150 extractors = mock(ClassExtractors.class);
151 dmaap = mock(DmaapManager.class);
152 controller = mock(ListeningController.class);
153 sched = mock(ScheduledThreadPoolExecutor.class);
154 drools = mock(DroolsController.class);
156 when(factory.makeEventQueue(any())).thenReturn(eventQueue);
157 when(factory.makeClassExtractors(any())).thenReturn(extractors);
158 when(factory.makeDmaapManager(any(), any())).thenReturn(dmaap);
159 when(factory.makeScheduler()).thenReturn(sched);
160 when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(true);
161 when(factory.decodeEvent(drools, TOPIC2, THE_EVENT)).thenReturn(DECODED_EVENT);
163 when(extractors.extract(DECODED_EVENT)).thenReturn(REQUEST_ID);
165 when(controller.getName()).thenReturn(MY_CONTROLLER);
166 when(controller.getDrools()).thenReturn(drools);
167 when(controller.isAlive()).thenReturn(true);
169 when(sched.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class))).thenAnswer(args -> {
170 ScheduledFuture<?> fut = mock(ScheduledFuture.class);
176 when(sched.scheduleWithFixedDelay(any(Runnable.class), any(Long.class), any(Long.class), any(TimeUnit.class)))
177 .thenAnswer(args -> {
178 ScheduledFuture<?> fut = mock(ScheduledFuture.class);
184 PoolingManagerImpl.setFactory(factory);
186 mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps, active);
190 public void testPoolingManagerImpl() throws Exception {
191 verify(factory).makeDmaapManager(any(), any());
193 State st = mgr.getCurrent();
194 assertTrue(st instanceof IdleState);
196 // ensure the state is attached to the manager
197 assertEquals(mgr.getHost(), st.getHost());
201 public void testPoolingManagerImpl_ClassEx() {
203 * this controller does not implement TopicListener, which should cause a
206 PolicyController ctlr = mock(PolicyController.class);
208 PoolingFeatureRtException ex = expectException(PoolingFeatureRtException.class,
209 () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps, active));
210 assertNotNull(ex.getCause());
211 assertTrue(ex.getCause() instanceof ClassCastException);
215 public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
216 // throw an exception when we try to create the dmaap manager
217 PoolingFeatureException ex = new PoolingFeatureException();
218 when(factory.makeDmaapManager(any(), any())).thenThrow(ex);
220 PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class,
221 () -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active));
222 assertEquals(ex, ex2.getCause());
226 public void testGetCurrent() throws Exception {
227 assertEquals(IdleState.class, mgr.getCurrent().getClass());
231 assertEquals(StartState.class, mgr.getCurrent().getClass());
235 public void testGetHost() {
236 assertEquals(MY_HOST, mgr.getHost());
238 mgr = new PoolingManagerImpl(HOST2, controller, poolProps, active);
239 assertEquals(HOST2, mgr.getHost());
243 public void testGetTopic() {
244 assertEquals(MY_TOPIC, mgr.getTopic());
248 public void testGetProperties() {
249 assertEquals(poolProps, mgr.getProperties());
253 public void testBeforeStart() throws Exception {
257 verify(dmaap).startPublisher();
259 verify(factory).makeScheduler();
260 verify(sched).setMaximumPoolSize(1);
261 verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
264 // try again - nothing should happen
267 verify(dmaap).startPublisher();
269 verify(factory).makeScheduler();
270 verify(sched).setMaximumPoolSize(1);
271 verify(sched).setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
275 public void testBeforeStart_DmaapEx() throws Exception {
276 // generate an exception
277 PoolingFeatureException ex = new PoolingFeatureException();
278 doThrow(ex).when(dmaap).startPublisher();
280 PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, () -> mgr.beforeStart());
281 assertEquals(ex, ex2);
283 // should never start the scheduler
284 verify(factory, never()).makeScheduler();
288 public void testAfterStart() throws Exception {
291 verify(dmaap).startConsumer(mgr);
293 State st = mgr.getCurrent();
294 assertTrue(st instanceof StartState);
296 // ensure the state is attached to the manager
297 assertEquals(mgr.getHost(), st.getHost());
299 ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
300 ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
301 verify(sched).schedule(any(Runnable.class), timeCap.capture(), unitCap.capture());
303 assertEquals(STD_HEARTBEAT_WAIT_MS, timeCap.getValue().longValue());
304 assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
307 // already started - nothing else happens
310 verify(dmaap).startConsumer(mgr);
312 assertTrue(mgr.getCurrent() instanceof StartState);
314 verify(sched).schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class));
318 public void testBeforeStop() throws Exception {
323 verify(dmaap).stopConsumer(mgr);
324 verify(sched).shutdownNow();
325 verify(dmaap).publish(contains("offline"));
327 assertTrue(mgr.getCurrent() instanceof IdleState);
331 public void testBeforeStop_NotRunning() throws Exception {
332 State st = mgr.getCurrent();
336 verify(dmaap, never()).stopConsumer(any());
337 verify(sched, never()).shutdownNow();
339 // hasn't changed states either
340 assertEquals(st, mgr.getCurrent());
344 public void testBeforeStop_AfterPartialStart() throws Exception {
345 // call beforeStart but not afterStart
348 State st = mgr.getCurrent();
352 // should still shut the scheduler down
353 verify(sched).shutdownNow();
355 verify(dmaap, never()).stopConsumer(any());
357 // hasn't changed states
358 assertEquals(st, mgr.getCurrent());
362 public void testAfterStop() throws Exception {
366 when(eventQueue.isEmpty()).thenReturn(false);
367 when(eventQueue.size()).thenReturn(3);
371 verify(eventQueue).clear();
372 verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
376 public void testAfterStop_EmptyQueue() throws Exception {
380 when(eventQueue.isEmpty()).thenReturn(true);
381 when(eventQueue.size()).thenReturn(0);
385 verify(eventQueue, never()).clear();
386 verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
390 public void testBeforeLock() throws Exception {
395 assertTrue(mgr.getCurrent() instanceof IdleState);
399 public void testAfterUnlock_AliveIdle() throws Exception {
400 // this really shouldn't happen
406 // stays in idle state, because it has no scheduler
407 assertTrue(mgr.getCurrent() instanceof IdleState);
411 public void testAfterUnlock_AliveStarted() throws Exception {
417 assertTrue(mgr.getCurrent() instanceof StartState);
421 public void testAfterUnlock_StoppedIdle() throws Exception {
425 // controller is stopped
426 when(controller.isAlive()).thenReturn(false);
430 assertTrue(mgr.getCurrent() instanceof IdleState);
434 public void testAfterUnlock_StoppedStarted() throws Exception {
437 // Note: don't lockMgr()
439 // controller is stopped
440 when(controller.isAlive()).thenReturn(false);
444 assertTrue(mgr.getCurrent() instanceof StartState);
448 public void testChangeState() throws Exception {
449 // start should invoke changeState()
454 // should have set the filter for the StartState
455 verify(dmaap, times(++ntimes)).setFilter(any());
458 * now go offline while it's locked
462 // should have set the new filter
463 verify(dmaap, times(++ntimes)).setFilter(any());
465 // should have cancelled the timers
466 assertEquals(2, futures.size());
467 verify(futures.poll()).cancel(false);
468 verify(futures.poll()).cancel(false);
475 // should have set the new filter
476 verify(dmaap, times(++ntimes)).setFilter(any());
478 // new timers should now be active
479 assertEquals(2, futures.size());
480 verify(futures.poll(), never()).cancel(false);
481 verify(futures.poll(), never()).cancel(false);
485 public void testSetFilter() throws Exception {
486 // start should cause a filter to be set
489 verify(dmaap).setFilter(any());
493 public void testSetFilter_DmaapEx() throws Exception {
495 // generate an exception
496 doThrow(new PoolingFeatureException()).when(dmaap).setFilter(any());
498 // start should invoke setFilter()
501 // no exception, means success
505 public void testInternalTopicFailed() throws Exception {
508 CountDownLatch latch = mgr.internalTopicFailed();
510 // wait for the thread to complete
511 assertTrue(latch.await(2, TimeUnit.SECONDS));
513 verify(controller).stop();
517 public void testSchedule() throws Exception {
518 // must start the scheduler
521 CountDownLatch latch = new CountDownLatch(1);
523 mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
529 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
530 ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
531 ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
533 verify(sched, times(2)).schedule(taskCap.capture(), timeCap.capture(), unitCap.capture());
535 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
536 assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
539 taskCap.getValue().run();
541 assertEquals(0, latch.getCount());
545 public void testScheduleWithFixedDelay() throws Exception {
546 // must start the scheduler
549 CountDownLatch latch = new CountDownLatch(1);
551 mgr.scheduleWithFixedDelay(STD_HEARTBEAT_WAIT_MS, STD_ACTIVE_HEARTBEAT_MS, () -> {
557 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
558 ArgumentCaptor<Long> initCap = ArgumentCaptor.forClass(Long.class);
559 ArgumentCaptor<Long> timeCap = ArgumentCaptor.forClass(Long.class);
560 ArgumentCaptor<TimeUnit> unitCap = ArgumentCaptor.forClass(TimeUnit.class);
562 verify(sched, times(2)).scheduleWithFixedDelay(taskCap.capture(), initCap.capture(), timeCap.capture(),
565 assertEquals(STD_HEARTBEAT_WAIT_MS, initCap.getValue().longValue());
566 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timeCap.getValue().longValue());
567 assertEquals(TimeUnit.MILLISECONDS, unitCap.getValue());
570 taskCap.getValue().run();
572 assertEquals(0, latch.getCount());
576 public void testPublishAdmin() throws Exception {
577 Offline msg = new Offline(mgr.getHost());
578 mgr.publishAdmin(msg);
580 assertEquals(Message.ADMIN, msg.getChannel());
582 verify(dmaap).publish(any());
586 public void testPublish() throws Exception {
587 Offline msg = new Offline(mgr.getHost());
588 mgr.publish("my.channel", msg);
590 assertEquals("my.channel", msg.getChannel());
592 verify(dmaap).publish(any());
596 public void testPublish_InvalidMsg() throws Exception {
597 // message is missing data
598 mgr.publish(Message.ADMIN, new Offline());
600 // should not have attempted to publish it
601 verify(dmaap, never()).publish(any());
605 public void testPublish_DmaapEx() throws Exception {
607 // generate exception
608 doThrow(new PoolingFeatureException()).when(dmaap).publish(any());
610 mgr.publish(Message.ADMIN, new Offline(mgr.getHost()));
614 public void testOnTopicEvent() throws Exception {
617 StartState st = (StartState) mgr.getCurrent();
620 * give it its heart beat, that should cause it to transition to the Query state.
622 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
623 hb.setChannel(Message.ADMIN);
625 String msg = ser.encodeMsg(hb);
627 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
629 assertTrue(mgr.getCurrent() instanceof QueryState);
633 public void testOnTopicEvent_NullEvent() throws Exception {
636 mgr.onTopicEvent(CommInfrastructure.UEB, TOPIC2, null);
640 public void testBeforeOffer_Unlocked_NoIntercept() throws Exception {
643 assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
647 public void testBeforeOffer_Locked_NoIntercept() throws Exception {
652 assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
656 public void testBeforeOffer_Locked_Intercept() throws Exception {
660 // route the message to this host
661 mgr.startDistributing(makeAssignments(true));
663 CountDownLatch latch = catchRecursion(false);
665 Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
668 verify(dmaap, times(START_PUB)).publish(any());
669 verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
671 // ensure we made it past both beforeXxx() methods
672 assertEquals(0, latch.getCount());
676 public void testBeforeInsert_Intercept() throws Exception {
680 // route the message to this host
681 mgr.startDistributing(makeAssignments(true));
683 CountDownLatch latch = catchRecursion(true);
685 Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
688 verify(dmaap, times(START_PUB)).publish(any());
689 verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
691 // ensure we made it past both beforeXxx() methods
692 assertEquals(0, latch.getCount());
696 public void testBeforeInsert_NoIntercept() throws Exception {
699 long tbegin = System.currentTimeMillis();
701 assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
703 ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
704 verify(eventQueue).add(msgCap.capture());
706 validateMessageContent(tbegin, msgCap.getValue());
710 public void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
713 when(extractors.extract(any())).thenReturn(null);
715 assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
719 public void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
722 when(extractors.extract(any())).thenReturn("");
724 assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
728 public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
731 assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
733 // should not have tried to enqueue a message
734 verify(eventQueue, never()).add(any());
738 public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
741 long tbegin = System.currentTimeMillis();
743 assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
745 ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
746 verify(eventQueue).add(msgCap.capture());
748 validateMessageContent(tbegin, msgCap.getValue());
752 public void testHandleExternalForward_NoAssignments() throws Exception {
755 long tbegin = System.currentTimeMillis();
757 assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
759 ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
760 verify(eventQueue).add(msgCap.capture());
762 validateMessageContent(tbegin, msgCap.getValue());
766 public void testHandleExternalForward() throws Exception {
769 // route the message to this host
770 mgr.startDistributing(makeAssignments(true));
772 assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
776 public void testHandleEvent_NullTarget() throws Exception {
779 // buckets have null targets
780 mgr.startDistributing(new BucketAssignments(new String[] {null, null}));
782 assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
784 verify(dmaap, times(START_PUB)).publish(any());
788 public void testHandleEvent_SameHost() throws Exception {
791 // route the message to this host
792 mgr.startDistributing(makeAssignments(true));
794 assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
796 verify(dmaap, times(START_PUB)).publish(any());
800 public void testHandleEvent_DiffHost_TooManyHops() throws Exception {
803 // route the message to this host
804 mgr.startDistributing(makeAssignments(false));
806 Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
807 msg.setNumHops(PoolingManagerImpl.MAX_HOPS + 1);
811 verify(dmaap, times(START_PUB)).publish(any());
812 verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
816 public void testHandleEvent_DiffHost_Forward() throws Exception {
819 // route the message to the *OTHER* host
820 mgr.startDistributing(makeAssignments(false));
822 assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
824 verify(dmaap, times(START_PUB + 1)).publish(any());
828 public void testExtractRequestId_NullEvent() throws Exception {
831 assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, null));
835 public void testExtractRequestId_NullReqId() throws Exception {
838 when(extractors.extract(any())).thenReturn(null);
840 assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
844 public void testExtractRequestId() throws Exception {
847 // route the message to the *OTHER* host
848 mgr.startDistributing(makeAssignments(false));
850 assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
854 public void testDecodeEvent_CannotDecode() throws Exception {
857 when(controller.isLocked()).thenReturn(true);
859 // create assignments, though they are irrelevant
860 mgr.startDistributing(makeAssignments(false));
862 when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(false);
864 assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
868 public void testDecodeEvent_UnsuppEx() throws Exception {
871 when(controller.isLocked()).thenReturn(true);
873 // create assignments, though they are irrelevant
874 mgr.startDistributing(makeAssignments(false));
876 // generate exception
877 doThrow(new UnsupportedOperationException()).when(factory).decodeEvent(drools, TOPIC2, THE_EVENT);
879 assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
883 public void testDecodeEvent_ArgEx() throws Exception {
886 when(controller.isLocked()).thenReturn(true);
888 // create assignments, though they are irrelevant
889 mgr.startDistributing(makeAssignments(false));
891 // generate exception
892 doThrow(new IllegalArgumentException()).when(factory).decodeEvent(drools, TOPIC2, THE_EVENT);
894 assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
898 public void testDecodeEvent_StateEx() throws Exception {
901 when(controller.isLocked()).thenReturn(true);
903 // create assignments, though they are irrelevant
904 mgr.startDistributing(makeAssignments(false));
906 // generate exception
907 doThrow(new IllegalStateException()).when(factory).decodeEvent(drools, TOPIC2, THE_EVENT);
909 assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
913 public void testDecodeEvent() throws Exception {
916 when(controller.isLocked()).thenReturn(true);
918 // route to another host
919 mgr.startDistributing(makeAssignments(false));
921 assertTrue(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
925 public void testMakeForward() throws Exception {
928 long tbegin = System.currentTimeMillis();
930 assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
932 ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
933 verify(eventQueue).add(msgCap.capture());
935 validateMessageContent(tbegin, msgCap.getValue());
939 public void testMakeForward_InvalidMsg() throws Exception {
942 assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
944 // should not have tried to enqueue a message
945 verify(eventQueue, never()).add(any());
949 public void testHandle_SameHost() throws Exception {
952 // route the message to this host
953 mgr.startDistributing(makeAssignments(true));
955 Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
958 verify(dmaap, times(START_PUB)).publish(any());
959 verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
963 public void testHandle_DiffHost() throws Exception {
966 // route the message to this host
967 mgr.startDistributing(makeAssignments(false));
969 Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
972 verify(dmaap, times(START_PUB + 1)).publish(any());
973 verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
977 public void testInject() throws Exception {
980 // route the message to this host
981 mgr.startDistributing(makeAssignments(true));
983 CountDownLatch latch = catchRecursion(true);
985 Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
988 verify(dmaap, times(START_PUB)).publish(any());
989 verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
991 // ensure we made it past both beforeXxx() methods
992 assertEquals(0, latch.getCount());
996 public void testInject_Ex() throws Exception {
999 // route the message to this host
1000 mgr.startDistributing(makeAssignments(true));
1002 // generate RuntimeException when onTopicEvent() is invoked
1003 doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
1005 CountDownLatch latch = catchRecursion(true);
1007 Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
1010 verify(dmaap, times(START_PUB)).publish(any());
1011 verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
1013 // ensure we made it past both beforeXxx() methods
1014 assertEquals(0, latch.getCount());
1018 public void testHandleInternal() throws Exception {
1021 StartState st = (StartState) mgr.getCurrent();
1024 * give it its heart beat, that should cause it to transition to the Query state.
1026 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
1027 hb.setChannel(Message.ADMIN);
1029 String msg = ser.encodeMsg(hb);
1031 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
1033 assertTrue(mgr.getCurrent() instanceof QueryState);
1037 public void testHandleInternal_IOEx() throws Exception {
1040 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, "invalid message");
1042 assertTrue(mgr.getCurrent() instanceof StartState);
1046 public void testHandleInternal_PoolEx() throws Exception {
1049 StartState st = (StartState) mgr.getCurrent();
1051 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
1054 * do NOT set the channel - this will cause the message to be invalid, triggering
1058 String msg = ser.encodeMsg(hb);
1060 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
1062 assertTrue(mgr.getCurrent() instanceof StartState);
1066 public void testStartDistributing() throws Exception {
1069 // route the message to this host
1070 assertNotNull(mgr.startDistributing(makeAssignments(true)));
1071 assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
1072 verify(eventQueue, never()).add(any());
1075 // null assignments should cause message to be queued
1076 assertNull(mgr.startDistributing(null));
1077 assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
1078 verify(eventQueue).add(any());
1081 // route the message to this host
1082 assertNotNull(mgr.startDistributing(makeAssignments(true)));
1083 assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
1084 verify(eventQueue).add(any());
1087 // route the message to the other host
1088 assertNotNull(mgr.startDistributing(makeAssignments(false)));
1089 assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
1090 verify(eventQueue).add(any());
1094 public void testStartDistributing_EventsInQueue_ProcessLocally() throws Exception {
1097 // put items in the queue
1098 LinkedList<Forward> lst = new LinkedList<>();
1099 lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1100 lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1101 lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1103 when(eventQueue.poll()).thenAnswer(args -> lst.poll());
1105 // route the messages to this host
1106 CountDownLatch latch = mgr.startDistributing(makeAssignments(true));
1107 assertTrue(latch.await(2, TimeUnit.SECONDS));
1109 // all of the events should have been processed locally
1110 verify(dmaap, times(START_PUB)).publish(any());
1111 verify(controller, times(3)).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
1115 public void testStartDistributing_EventsInQueue_Forward() throws Exception {
1118 // put items in the queue
1119 LinkedList<Forward> lst = new LinkedList<>();
1120 lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1121 lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1122 lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
1124 when(eventQueue.poll()).thenAnswer(args -> lst.poll());
1126 // route the messages to the OTHER host
1127 CountDownLatch latch = mgr.startDistributing(makeAssignments(false));
1128 assertTrue(latch.await(2, TimeUnit.SECONDS));
1130 // all of the events should have been forwarded
1131 verify(dmaap, times(4)).publish(any());
1132 verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
1136 public void testGoStart() {
1137 State st = mgr.goStart();
1138 assertTrue(st instanceof StartState);
1139 assertEquals(mgr.getHost(), st.getHost());
1143 public void testGoQuery() {
1144 BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
1145 mgr.startDistributing(asgn);
1147 State st = mgr.goQuery();
1149 assertTrue(st instanceof QueryState);
1150 assertEquals(mgr.getHost(), st.getHost());
1151 assertEquals(asgn, mgr.getAssignments());
1155 public void testGoActive() {
1156 BucketAssignments asgn = new BucketAssignments(new String[] {HOST2});
1157 mgr.startDistributing(asgn);
1159 State st = mgr.goActive();
1161 assertTrue(st instanceof ActiveState);
1162 assertEquals(mgr.getHost(), st.getHost());
1163 assertEquals(asgn, mgr.getAssignments());
1164 assertEquals(0, active.getCount());
1168 public void testGoInactive() {
1169 State st = mgr.goInactive();
1170 assertTrue(st instanceof InactiveState);
1171 assertEquals(mgr.getHost(), st.getHost());
1172 assertEquals(1, active.getCount());
1176 public void testTimerActionRun() throws Exception {
1177 // must start the scheduler
1180 CountDownLatch latch = new CountDownLatch(1);
1182 mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
1188 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
1190 verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
1193 taskCap.getValue().run();
1195 assertEquals(0, latch.getCount());
1199 public void testTimerActionRun_DiffState() throws Exception {
1200 // must start the scheduler
1203 CountDownLatch latch = new CountDownLatch(1);
1205 mgr.schedule(STD_ACTIVE_HEARTBEAT_MS, () -> {
1211 ArgumentCaptor<Runnable> taskCap = ArgumentCaptor.forClass(Runnable.class);
1213 verify(sched, times(2)).schedule(taskCap.capture(), any(Long.class), any(TimeUnit.class));
1215 // give it a heartbeat so that it transitions to the query state
1216 StartState st = (StartState) mgr.getCurrent();
1217 Heartbeat hb = new Heartbeat(mgr.getHost(), st.getHbTimestampMs());
1218 hb.setChannel(Message.ADMIN);
1220 String msg = ser.encodeMsg(hb);
1222 mgr.onTopicEvent(CommInfrastructure.UEB, MY_TOPIC, msg);
1224 assertTrue(mgr.getCurrent() instanceof QueryState);
1227 taskCap.getValue().run();
1229 // it should NOT have counted down
1230 assertEquals(1, latch.getCount());
1234 * Validates the message content.
1236 * @param tbegin creation time stamp must be no less than this
1237 * @param msg message to be validated
1239 private void validateMessageContent(long tbegin, Forward msg) {
1240 assertEquals(0, msg.getNumHops());
1241 assertTrue(msg.getCreateTimeMs() >= tbegin);
1242 assertEquals(mgr.getHost(), msg.getSource());
1243 assertEquals(CommInfrastructure.UEB, msg.getProtocol());
1244 assertEquals(TOPIC2, msg.getTopic());
1245 assertEquals(THE_EVENT, msg.getPayload());
1246 assertEquals(REQUEST_ID, msg.getRequestId());
1250 * Configure the mock controller to act like a real controller, invoking beforeOffer
1251 * and then beforeInsert, so we can make sure they pass through. We'll keep count to
1252 * ensure we don't get into infinite recursion.
1254 * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked,
1255 * {@code false} if it should be skipped
1257 * @return a latch that will be counted down if both beforeXxx() methods return false
1259 private CountDownLatch catchRecursion(boolean invokeBeforeInsert) {
1260 CountDownLatch recursion = new CountDownLatch(3);
1261 CountDownLatch latch = new CountDownLatch(1);
1265 recursion.countDown();
1266 if (recursion.getCount() == 0) {
1267 fail("recursive calls to onTopicEvent");
1271 CommInfrastructure proto = args.getArgument(iarg++);
1272 String topic = args.getArgument(iarg++);
1273 String event = args.getArgument(iarg++);
1275 if (mgr.beforeOffer(proto, topic, event)) {
1279 if (invokeBeforeInsert && mgr.beforeInsert(proto, topic, event, DECODED_EVENT)) {
1286 }).when(controller).onTopicEvent(any(), any(), any());
1292 * Makes an assignment with two buckets.
1294 * @param sameHost {@code true} if the {@link #REQUEST_ID} should hash to the
1295 * manager's bucket, {@code false} if it should hash to the other host's bucket
1296 * @return a new bucket assignment
1298 private BucketAssignments makeAssignments(boolean sameHost) {
1299 int slot = REQUEST_ID.hashCode() % 2;
1301 // slot numbers are 0 and 1 - reverse them if it's for a different host
1306 String[] asgn = new String[2];
1307 asgn[slot] = mgr.getHost();
1308 asgn[1 - slot] = HOST2;
1310 return new BucketAssignments(asgn);
1314 * Invokes methods necessary to start the manager.
1316 * @throws PoolingFeatureException if an error occurs
1318 private void startMgr() throws PoolingFeatureException {
1324 * Invokes methods necessary to lock the manager.
1326 private void lockMgr() {
1331 * Invokes methods necessary to unlock the manager.
1333 private void unlockMgr() {
1338 * Used to create a mock object that implements both super interfaces.
1340 private static interface ListeningController extends TopicListener, PolicyController {
1345 * Invokes a method that is expected to throw an exception.
1347 * @param exClass class of exception that is expected
1348 * @param func function to invoke
1349 * @return the exception that was thrown
1350 * @throws AssertionError if no exception was thrown
1352 private <T extends Exception> T expectException(Class<T> exClass, ExFunction<T> func) {
1355 throw new AssertionError("missing exception");
1357 } catch (Exception e) {
1358 return exClass.cast(e);
1363 * Function that is expected to throw an exception.
1365 * @param <T> type of exception the function is expected to throw
1367 @FunctionalInterface
1368 private static interface ExFunction<T extends Exception> {
1371 * Invokes the function.
1373 * @throws T if an error occurs
1375 public void apply() throws T;