01253fbf2b9ed4a44485e7806aa204cb8631265a
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / FeatureTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
30
31 import com.fasterxml.jackson.core.type.TypeReference;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Deque;
36 import java.util.IdentityHashMap;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Properties;
40 import java.util.TreeMap;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentMap;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.LinkedBlockingQueue;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.concurrent.atomic.AtomicReference;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.invocation.InvocationOnMock;
54 import org.mockito.stubbing.Answer;
55 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
56 import org.onap.policy.common.endpoints.event.comm.Topic;
57 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
58 import org.onap.policy.common.endpoints.event.comm.TopicListener;
59 import org.onap.policy.common.endpoints.event.comm.TopicSink;
60 import org.onap.policy.common.endpoints.event.comm.TopicSource;
61 import org.onap.policy.drools.controller.DroolsController;
62 import org.onap.policy.drools.pooling.message.Message;
63 import org.onap.policy.drools.system.PolicyController;
64 import org.onap.policy.drools.system.PolicyEngine;
65 import org.onap.policy.drools.utils.Pair;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 /**
70  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
71  * its own feature object. Uses real feature objects. However, the following are not:
72  * <dl>
73  * <dt>DMaaP sources and sinks</dt>
74  * <dd>simulated using queues. There is one queue for the external topic, and one queue
75  * for each host's internal topic. Messages published to the "admin" channel are simply
76  * sent to all of the hosts' internal topic queues</dd>
77  * <dt>PolicyEngine, PolicyController, DroolsController</dt>
78  * <dd>mocked</dd>
79  * </dl>
80  * 
81  * <p>Invoke {@link #runSlow()}, before the test, to slow things down.
82  */
83 public class FeatureTest {
84
85     private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
86
87     /**
88      * Name of the topic used for inter-host communication.
89      */
90     private static final String INTERNAL_TOPIC = "my.internal.topic";
91
92     /**
93      * Name of the topic from which "external" events "arrive".
94      */
95     private static final String EXTERNAL_TOPIC = "my.external.topic";
96
97     /**
98      * Name of the controller.
99      */
100     private static final String CONTROLLER1 = "controller.one";
101
102     private static long stdReactivateWaitMs = 200;
103     private static long stdIdentificationMs = 60;
104     private static long stdStartHeartbeatMs = 60;
105     private static long stdActiveHeartbeatMs = 50;
106     private static long stdInterHeartbeatMs = 5;
107     private static long stdOfflinePubWaitMs = 2;
108     private static long stdPollMs = 2;
109     private static long stdInterPollMs = 2;
110     private static long stdEventWaitSec = 10;
111
112     /**
113      * Used to decode events into a Map.
114      */
115     private static final TypeReference<TreeMap<String, String>> typeRef =
116                     new TypeReference<TreeMap<String, String>>() {};
117
118     /**
119      * Used to decode events from the external topic.
120      */
121     private static final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
122         @Override
123         protected ObjectMapper initialValue() {
124             return new ObjectMapper();
125         }
126     };
127
128     /**
129      * Used to identify the current context.
130      */
131     private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>();
132
133     /**
134      * Context for the current test case.
135      */
136     private Context ctx;
137
138     /**
139      * Setup.
140      */
141     @Before
142     public void setUp() {
143         ctx = null;
144     }
145
146     /**
147      * Tear down.
148      */
149     @After
150     public void tearDown() {
151         if (ctx != null) {
152             ctx.destroy();
153         }
154     }
155
156     @Test
157     public void test_SingleHost() throws Exception {
158         run(70, 1);
159     }
160
161     @Test
162     public void test_TwoHosts() throws Exception {
163         run(200, 2);
164     }
165
166     @Test
167     public void test_ThreeHosts() throws Exception {
168         run(200, 3);
169     }
170
171     private void run(int nmessages, int nhosts) throws Exception {
172         ctx = new Context(nmessages);
173
174         for (int x = 0; x < nhosts; ++x) {
175             ctx.addHost();
176         }
177
178         ctx.startHosts();
179
180         for (int x = 0; x < nmessages; ++x) {
181             ctx.offerExternal(makeMessage(x));
182         }
183
184         ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
185
186         assertEquals(0, ctx.getDecodeErrors());
187         assertEquals(0, ctx.getRemainingEvents());
188         ctx.checkAllSawAMsg();
189     }
190
191     private String makeMessage(int reqnum) {
192         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
193     }
194     
195     /**
196      * Invoke this to slow the timers down.
197      */
198     protected static void runSlow() {
199         stdReactivateWaitMs = 10000;
200         stdIdentificationMs = 10000;
201         stdStartHeartbeatMs = 15000;
202         stdActiveHeartbeatMs = 12000;
203         stdInterHeartbeatMs = 5000;
204         stdOfflinePubWaitMs = 2;
205         stdPollMs = 2;
206         stdInterPollMs = 2000;
207         stdEventWaitSec = 1000;
208     }
209
210     /**
211      * Decodes an event.
212      * 
213      * @param event event
214      * @return the decoded event, or {@code null} if it cannot be decoded
215      */
216     private static Object decodeEvent(String event) {
217         try {
218             return mapper.get().readValue(event, typeRef);
219
220         } catch (IOException e) {
221             logger.warn("cannot decode external event", e);
222             return null;
223         }
224     }
225
226     /**
227      * Context used for a single test case.
228      */
229     private static class Context {
230
231         /**
232          * Hosts that have been added to this context.
233          */
234         private final Deque<Host> hosts = new LinkedList<>();
235
236         /**
237          * Maps a drools controller to its policy controller.
238          */
239         private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
240
241         /**
242          * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
243          */
244         private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
245
246         /**
247          * Queue for the external "DMaaP" topic.
248          */
249         private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
250
251         /**
252          * Counts the number of decode errors.
253          */
254         private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
255
256         /**
257          * Number of events we're still waiting to receive.
258          */
259         private final CountDownLatch eventCounter;
260
261         /**
262          * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
263          * {@link #getCurrentHost()}.
264          */
265         private Host currentHost = null;
266
267         /**
268          * Constructor.
269          * 
270          * @param nEvents number of events to be processed
271          */
272         public Context(int events) {
273             eventCounter = new CountDownLatch(events);
274         }
275
276         /**
277          * Destroys the context, stopping any hosts that remain.
278          */
279         public void destroy() {
280             stopHosts();
281             hosts.clear();
282         }
283
284         /**
285          * Creates and adds a new host to the context.
286          * 
287          * @return the new Host
288          */
289         public Host addHost() {
290             Host host = new Host(this);
291             hosts.add(host);
292
293             return host;
294         }
295
296         /**
297          * Starts the hosts.
298          */
299         public void startHosts() {
300             hosts.forEach(host -> host.start());
301         }
302
303         /**
304          * Stops the hosts.
305          */
306         public void stopHosts() {
307             hosts.forEach(host -> host.stop());
308         }
309
310         /**
311          * Verifies that all hosts processed at least one message.
312          */
313         public void checkAllSawAMsg() {
314             int msgs = 0;
315             for (Host host : hosts) {
316                 assertTrue("msgs=" + msgs, host.messageSeen());
317                 ++msgs;
318             }
319         }
320
321         /**
322          * Sets {@link #currentHost} to the specified host, and then invokes the given
323          * function. Resets {@link #currentHost} to {@code null} before returning.
324          * 
325          * @param host host
326          * @param func function to invoke
327          */
328         public void withHost(Host host, VoidFunction func) {
329             currentHost = host;
330             func.apply();
331             currentHost = null;
332         }
333
334         /**
335          * Offers an event to the external topic.
336          * 
337          * @param event event
338          */
339         public void offerExternal(String event) {
340             externalTopic.offer(event);
341         }
342
343         /**
344          * Adds an internal channel to the set of channels.
345          * 
346          * @param channel channel
347          * @param queue the channel's queue
348          */
349         public void addInternal(String channel, BlockingQueue<String> queue) {
350             channel2queue.put(channel, queue);
351         }
352
353         /**
354          * Offers a message to all internal channels.
355          * 
356          * @param message message
357          */
358         public void offerInternal(String message) {
359             channel2queue.values().forEach(queue -> queue.offer(message));
360         }
361
362         /**
363          * Offers amessage to an internal channel.
364          * 
365          * @param channel channel
366          * @param message message
367          */
368         public void offerInternal(String channel, String message) {
369             BlockingQueue<String> queue = channel2queue.get(channel);
370             if (queue != null) {
371                 queue.offer(message);
372             }
373         }
374
375         /**
376          * Associates a controller with its drools controller.
377          * 
378          * @param controller controller
379          * @param droolsController drools controller
380          */
381         public void addController(PolicyController controller, DroolsController droolsController) {
382             drools2policy.put(droolsController, controller);
383         }
384
385         /**
386          * Get controller.
387          * 
388          * @param droolsController drools controller
389          * @return the controller associated with a drools controller, or {@code null} if
390          *         it has no associated controller
391          */
392         public PolicyController getController(DroolsController droolsController) {
393             return drools2policy.get(droolsController);
394         }
395
396         /**
397          * Constructor.
398          * 
399          * @return queue for the external topic
400          */
401         public BlockingQueue<String> getExternalTopic() {
402             return externalTopic;
403         }
404
405         /**
406          * Get decode errors.
407          * 
408          * @return the number of decode errors so far
409          */
410         public int getDecodeErrors() {
411             return numDecodeErrors.get();
412         }
413
414         /**
415          * Increments the count of decode errors.
416          */
417         public void bumpDecodeErrors() {
418             numDecodeErrors.incrementAndGet();
419         }
420
421         /**
422          * Get remaining events.
423          * 
424          * @return the number of events that haven't been processed
425          */
426         public long getRemainingEvents() {
427             return eventCounter.getCount();
428         }
429
430         /**
431          * Adds an event to the counter.
432          */
433         public void addEvent() {
434             eventCounter.countDown();
435         }
436
437         /**
438          * Waits, for a period of time, for all events to be processed.
439          * 
440          * @param time time
441          * @param units units
442          * @return {@code true} if all events have been processed, {@code false} otherwise
443          * @throws InterruptedException throws interrupted
444          */
445         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
446             return eventCounter.await(time, units);
447         }
448
449         /**
450          * Gets the current host, provided this is used from within a call to
451          * {@link #withHost(Host, VoidFunction)}.
452          * 
453          * @return the current host, or {@code null} if there is no current host
454          */
455         public Host getCurrentHost() {
456             return currentHost;
457         }
458     }
459
460     /**
461      * Simulates a single "host".
462      */
463     private static class Host {
464
465         private final Context context;
466
467         private final PoolingFeature feature;
468
469         /**
470          * {@code True} if this host has processed a message, {@code false} otherwise.
471          */
472         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
473
474         /**
475          * This host's internal "DMaaP" topic.
476          */
477         private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
478
479         /**
480          * Source that reads from the external topic and posts to the listener.
481          */
482         private TopicSource externalSource;
483
484         // mock objects
485         private final PolicyEngine engine = mock(PolicyEngine.class);
486         private final ListenerController controller = mock(ListenerController.class);
487         private final DroolsController drools = mock(DroolsController.class);
488
489         /**
490          * Constructor.
491          * 
492          * @param context context
493          */
494         public Host(Context context) {
495             this.context = context;
496
497             when(controller.getName()).thenReturn(CONTROLLER1);
498             when(controller.getDrools()).thenReturn(drools);
499
500             // stop consuming events if the controller stops
501             when(controller.stop()).thenAnswer(args -> {
502                 externalSource.unregister(controller);
503                 return true;
504             });
505
506             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
507
508             context.addController(controller, drools);
509
510             // arrange to read from the external topic
511             externalSource = new TopicSourceImpl(context, false);
512             
513             feature = new PoolingFeatureImpl(context);
514         }
515
516         /**
517          * Get name.
518          * 
519          * @return the host name
520          */
521         public String getName() {
522             return feature.getHost();
523         }
524
525         /**
526          * Starts threads for the host so that it begins consuming from both the external
527          * "DMaaP" topic and its own internal "DMaaP" topic.
528          */
529         public void start() {
530
531             context.withHost(this, () -> {
532
533                 feature.beforeStart(engine);
534                 feature.afterCreate(controller);
535
536                 // assign the queue for this host's internal topic
537                 context.addInternal(getName(), msgQueue);
538
539                 feature.beforeStart(controller);
540
541                 // start consuming events from the external topic
542                 externalSource.register(controller);
543
544                 feature.afterStart(controller);
545             });
546         }
547
548         /**
549          * Stops the host's threads.
550          */
551         public void stop() {
552             feature.beforeStop(controller);
553             externalSource.unregister(controller);
554             feature.afterStop(controller);
555         }
556
557         /**
558          * Offers an event to the feature, before the policy controller handles it.
559          * 
560          * @param protocol protocol
561          * @param topic2 topic
562          * @param event event
563          * @return {@code true} if the event was handled, {@code false} otherwise
564          */
565         public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
566             return feature.beforeOffer(controller, protocol, topic2, event);
567         }
568
569         /**
570          * Offers an event to the feature, after the policy controller handles it.
571          * 
572          * @param protocol protocol
573          * @param topic topic
574          * @param event event
575          * @param success success
576          * @return {@code true} if the event was handled, {@code false} otherwise
577          */
578         public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
579
580             return feature.afterOffer(controller, protocol, topic, event, success);
581         }
582
583         /**
584          * Offers an event to the feature, before the drools controller handles it.
585          * 
586          * @param fact fact
587          * @return {@code true} if the event was handled, {@code false} otherwise
588          */
589         public boolean beforeInsert(Object fact) {
590             return feature.beforeInsert(drools, fact);
591         }
592
593         /**
594          * Offers an event to the feature, after the drools controller handles it.
595          * 
596          * @param fact fact
597          * @param successInsert {@code true} if it was successfully inserted by the drools
598          *        controller, {@code false} otherwise
599          * @return {@code true} if the event was handled, {@code false} otherwise
600          */
601         public boolean afterInsert(Object fact, boolean successInsert) {
602             return feature.afterInsert(drools, fact, successInsert);
603         }
604
605         /**
606          * Indicates that a message was seen for this host.
607          */
608         public void sawMessage() {
609             sawMsg.set(true);
610         }
611
612         /**
613          * Message seen.
614          * 
615          * @return {@code true} if a message was seen for this host, {@code false}
616          *         otherwise
617          */
618         public boolean messageSeen() {
619             return sawMsg.get();
620         }
621
622         /**
623          * Get internal queue.
624          * 
625          * @return the queue associated with this host's internal topic
626          */
627         public BlockingQueue<String> getInternalQueue() {
628             return msgQueue;
629         }
630     }
631
632     /**
633      * Listener for the external topic. Simulates the actions taken by
634      * <i>AggregatedPolicyController.onTopicEvent</i>.
635      */
636     private static class MyExternalTopicListener implements Answer<Void> {
637
638         private final Context context;
639         private final Host host;
640
641         public MyExternalTopicListener(Context context, Host host) {
642             this.context = context;
643             this.host = host;
644         }
645
646         @Override
647         public Void answer(InvocationOnMock args) throws Throwable {
648             int index = 0;
649             CommInfrastructure commType = args.getArgument(index++);
650             String topic = args.getArgument(index++);
651             String event = args.getArgument(index++);
652
653             if (host.beforeOffer(commType, topic, event)) {
654                 return null;
655             }
656
657             boolean result;
658             Object fact = decodeEvent(event);
659
660             if (fact == null) {
661                 result = false;
662                 context.bumpDecodeErrors();
663
664             } else {
665                 result = true;
666
667                 if (!host.beforeInsert(fact)) {
668                     // feature did not handle it so we handle it here
669                     host.afterInsert(fact, result);
670
671                     host.sawMessage();
672                     context.addEvent();
673                 }
674             }
675
676             host.afterOffer(commType, topic, event, result);
677             return null;
678         }
679     }
680
681     /**
682      * Sink implementation that puts a message on the queue specified by the
683      * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
684      * message is placed on all queues.
685      */
686     private static class TopicSinkImpl extends TopicImpl implements TopicSink {
687
688         private final Context context;
689
690         /**
691          * Used to decode the messages so that the channel can be extracted.
692          */
693         private final Serializer serializer = new Serializer();
694
695         /**
696          * Constructor.
697          * 
698          * @param context context
699          */
700         public TopicSinkImpl(Context context) {
701             this.context = context;
702         }
703
704         @Override
705         public synchronized boolean send(String message) {
706             if (!isAlive()) {
707                 return false;
708             }
709
710             try {
711                 Message msg = serializer.decodeMsg(message);
712                 String channel = msg.getChannel();
713
714                 if (Message.ADMIN.equals(channel)) {
715                     // add to every queue
716                     context.offerInternal(message);
717
718                 } else {
719                     // add to a specific queue
720                     context.offerInternal(channel, message);
721                 }
722
723                 return true;
724
725             } catch (IOException e) {
726                 logger.warn("could not decode message: {}", message);
727                 context.bumpDecodeErrors();
728                 return false;
729             }
730         }
731     }
732
733     /**
734      * Source implementation that reads from a queue associated with a topic.
735      */
736     private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
737
738         private final String topic;
739
740         /**
741          * Queue from which to retrieve messages.
742          */
743         private final BlockingQueue<String> queue;
744
745         /**
746          * Manages the current consumer thread. The "first" item is used as a trigger to
747          * tell the thread to stop processing, while the "second" item is triggered <i>by
748          * the thread</i> when it completes.
749          */
750         private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
751
752         /**
753          * Constructor.
754          * 
755          * @param context context
756          * @param internal {@code true} if to read from the internal topic, {@code false}
757          *        to read from the external topic
758          */
759         public TopicSourceImpl(Context context, boolean internal) {
760             if (internal) {
761                 this.topic = INTERNAL_TOPIC;
762                 this.queue = context.getCurrentHost().getInternalQueue();
763
764             } else {
765                 this.topic = EXTERNAL_TOPIC;
766                 this.queue = context.getExternalTopic();
767             }
768         }
769
770         @Override
771         public void setFilter(String filter) {
772             logger.info("topic filter set to: {}", filter);
773         }
774
775         @Override
776         public String getTopic() {
777             return topic;
778         }
779
780         @Override
781         public boolean offer(String event) {
782             throw new UnsupportedOperationException("offer topic source");
783         }
784
785         /**
786          * Starts a thread that takes messages from the queue and gives them to the
787          * listener. Stops the thread of any previously registered listener.
788          */
789         @Override
790         public void register(TopicListener listener) {
791             Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1));
792
793             reregister(newPair);
794
795             Thread thread = new Thread(() -> {
796
797                 try {
798                     do {
799                         processMessages(newPair.first(), listener);
800                     } 
801                     while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
802
803                     logger.info("topic source thread completed");
804
805                 } catch (InterruptedException e) {
806                     logger.warn("topic source thread aborted", e);
807                     Thread.currentThread().interrupt();
808
809                 } catch (RuntimeException e) {
810                     logger.warn("topic source thread aborted", e);
811                 }
812
813                 newPair.second().countDown();
814
815             });
816
817             thread.setDaemon(true);
818             thread.start();
819         }
820
821         /**
822          * Stops the thread of <i>any</i> currently registered listener.
823          */
824         @Override
825         public void unregister(TopicListener listener) {
826             reregister(null);
827         }
828
829         /**
830          * Registers a new "pair" with this source, stopping the consumer associated with
831          * any previous registration.
832          * 
833          * @param newPair the new "pair", or {@code null} to unregister
834          */
835         private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
836             try {
837                 Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
838                 if (oldPair == null) {
839                     if (newPair == null) {
840                         // unregister was invoked twice in a row
841                         logger.warn("re-unregister for topic source");
842                     }
843
844                     // no previous thread to stop
845                     return;
846                 }
847
848                 // need to stop the previous thread
849
850                 // tell it to stop
851                 oldPair.first().countDown();
852
853                 // wait for it to stop
854                 if (!oldPair.second().await(2, TimeUnit.SECONDS)) {
855                     logger.warn("old topic registration is still running");
856                 }
857
858             } catch (InterruptedException e) {
859                 logger.warn("old topic registration may still be running", e);
860                 Thread.currentThread().interrupt();
861             }
862
863             if (newPair != null) {
864                 // register was invoked twice in a row
865                 logger.warn("re-register for topic source");
866             }
867         }
868
869         /**
870          * Polls for messages from the topic and offers them to the listener.
871          * 
872          * @param stopped triggered if processing should stop
873          * @param listener listener
874          * @throws InterruptedException throws interrupted exception
875          */
876         private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
877
878             for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
879
880                 String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
881                 if (msg == null) {
882                     return;
883                 }
884
885                 listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
886             }
887         }
888     }
889
890     /**
891      * Topic implementation. Most methods just throw
892      * {@link UnsupportedOperationException}.
893      */
894     private static class TopicImpl implements Topic {
895
896         /**
897          * Constructor.
898          */
899         public TopicImpl() {
900             super();
901         }
902
903         @Override
904         public String getTopic() {
905             return INTERNAL_TOPIC;
906         }
907
908         @Override
909         public CommInfrastructure getTopicCommInfrastructure() {
910             throw new UnsupportedOperationException("topic protocol");
911         }
912
913         @Override
914         public List<String> getServers() {
915             throw new UnsupportedOperationException("topic servers");
916         }
917
918         @Override
919         public String[] getRecentEvents() {
920             throw new UnsupportedOperationException("topic events");
921         }
922
923         @Override
924         public void register(TopicListener topicListener) {
925             throw new UnsupportedOperationException("register topic");
926         }
927
928         @Override
929         public void unregister(TopicListener topicListener) {
930             throw new UnsupportedOperationException("unregister topic");
931         }
932
933         @Override
934         public synchronized boolean start() {
935             return true;
936         }
937
938         @Override
939         public synchronized boolean stop() {
940             return true;
941         }
942
943         @Override
944         public synchronized void shutdown() {
945             // do nothing
946         }
947
948         @Override
949         public synchronized boolean isAlive() {
950             return true;
951         }
952
953         @Override
954         public boolean lock() {
955             throw new UnsupportedOperationException("lock topicink");
956         }
957
958         @Override
959         public boolean unlock() {
960             throw new UnsupportedOperationException("unlock topic");
961         }
962
963         @Override
964         public boolean isLocked() {
965             throw new UnsupportedOperationException("topic isLocked");
966         }
967     }
968
969     /**
970      * Feature with overrides.
971      */
972     private static class PoolingFeatureImpl extends PoolingFeature {
973
974         private final Context context;
975
976         /**
977          * Constructor.
978          * 
979          * @param context context
980          */
981         public PoolingFeatureImpl(Context context) {
982             this.context = context;
983
984             /*
985              * Note: do NOT extract anything from "context" at this point, because it
986              * hasn't been fully initialized yet
987              */
988         }
989
990         @Override
991         public Properties getProperties(String featName) {
992             Properties props = new Properties();
993
994             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
995
996             props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
997             props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
998             props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
999             props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
1000             props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
1001                             "" + stdOfflinePubWaitMs);
1002             props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
1003                             "" + stdStartHeartbeatMs);
1004             props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
1005             props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
1006             props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
1007                             "" + stdActiveHeartbeatMs);
1008             props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
1009                             "" + stdInterHeartbeatMs);
1010
1011             return props;
1012         }
1013
1014         @Override
1015         public PolicyController getController(DroolsController droolsController) {
1016             return context.getController(droolsController);
1017         }
1018
1019         /**
1020          * Embeds a specializer within a property name, after the prefix.
1021          * 
1022          * @param propnm property name into which it should be embedded
1023          * @param spec specializer to be embedded
1024          * @return the property name, with the specializer embedded within it
1025          */
1026         private String specialize(String propnm, String spec) {
1027             String suffix = propnm.substring(PREFIX.length());
1028             return PREFIX + spec + "." + suffix;
1029         }
1030
1031         @Override
1032         protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
1033                         CountDownLatch activeLatch) {
1034
1035             currentContext.set(context);
1036             
1037             return new PoolingManagerTest(host, controller, props, activeLatch);
1038         }
1039     }
1040
1041     /**
1042      * Pooling Manager with overrides.
1043      */
1044     private static class PoolingManagerTest extends PoolingManagerImpl {
1045
1046         /**
1047          * Constructor.
1048          * 
1049          * @param host the host
1050          * @param controller the controller
1051          * @param props the properties
1052          * @param activeLatch the latch
1053          */
1054         public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
1055                         CountDownLatch activeLatch) {
1056
1057             super(host, controller, props, activeLatch);
1058         }
1059
1060         @Override
1061         protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
1062             return new DmaapManagerImpl(topic);
1063         }
1064
1065         @Override
1066         protected boolean canDecodeEvent(DroolsController drools, String topic) {
1067             return true;
1068         }
1069
1070         @Override
1071         protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
1072             return decodeEvent(event);
1073         }
1074     }
1075
1076     /**
1077      * DMaaP Manager with overrides.
1078      */
1079     private static class DmaapManagerImpl extends DmaapManager {
1080
1081         /**
1082          * Constructor.
1083          * 
1084          * @param context this manager's context
1085          * @param topic the topic
1086          * @throws PoolingFeatureException if an error occurs
1087          */
1088         public DmaapManagerImpl(String topic) throws PoolingFeatureException {
1089             super(topic);
1090         }
1091
1092         @Override
1093         protected List<TopicSource> getTopicSources() {
1094             return Arrays.asList(new TopicSourceImpl(currentContext.get(), true));
1095         }
1096
1097         @Override
1098         protected List<TopicSink> getTopicSinks() {
1099             return Arrays.asList(new TopicSinkImpl(currentContext.get()));
1100         }
1101     }
1102
1103     /**
1104      * Controller that also implements the {@link TopicListener} interface.
1105      */
1106     private static interface ListenerController extends PolicyController, TopicListener {
1107
1108     }
1109
1110     /**
1111      * Simple function that takes no arguments and returns nothing.
1112      */
1113     @FunctionalInterface
1114     private static interface VoidFunction {
1115
1116         public void apply();
1117     }
1118 }