Add comprehensive junit tests for pooling feature
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / FeatureTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
30 import java.io.IOException;
31 import java.util.Arrays;
32 import java.util.Deque;
33 import java.util.IdentityHashMap;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Properties;
37 import java.util.TreeMap;
38 import java.util.concurrent.BlockingQueue;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentMap;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicInteger;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.After;
48 import org.junit.AfterClass;
49 import org.junit.Before;
50 import org.junit.BeforeClass;
51 import org.junit.Test;
52 import org.mockito.invocation.InvocationOnMock;
53 import org.mockito.stubbing.Answer;
54 import org.onap.policy.drools.controller.DroolsController;
55 import org.onap.policy.drools.event.comm.FilterableTopicSource;
56 import org.onap.policy.drools.event.comm.Topic;
57 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
58 import org.onap.policy.drools.event.comm.TopicListener;
59 import org.onap.policy.drools.event.comm.TopicSink;
60 import org.onap.policy.drools.event.comm.TopicSource;
61 import org.onap.policy.drools.pooling.message.Message;
62 import org.onap.policy.drools.system.PolicyController;
63 import org.onap.policy.drools.system.PolicyEngine;
64 import org.onap.policy.drools.utils.Pair;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67 import com.fasterxml.jackson.core.type.TypeReference;
68 import com.fasterxml.jackson.databind.ObjectMapper;
69
70 /**
71  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
72  * its own feature object. Uses real feature objects. However, the following are not:
73  * <dl>
74  * <dt>DMaaP sources and sinks</dt>
75  * <dd>simulated using queues. There is one queue for the external topic, and one queue
76  * for each host's internal topic. Messages published to the "admin" channel are simply
77  * sent to all of the hosts' internal topic queues</dd>
78  * <dt>PolicyEngine, PolicyController, DroolsController</dt>
79  * <dd>mocked</dd>
80  * </dl>
81  */
82 public class FeatureTest {
83
84     private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
85
86     /**
87      * Name of the topic used for inter-host communication.
88      */
89     private static final String INTERNAL_TOPIC = "my.internal.topic";
90
91     /**
92      * Name of the topic from which "external" events "arrive".
93      */
94     private static final String EXTERNAL_TOPIC = "my.external.topic";
95
96     /**
97      * Name of the controller.
98      */
99     private static final String CONTROLLER1 = "controller.one";
100
101     private static long stdReactivateWaitMs = 200;
102     private static long stdIdentificationMs = 60;
103     private static long stdStartHeartbeatMs = 60;
104     private static long stdActiveHeartbeatMs = 50;
105     private static long stdInterHeartbeatMs = 5;
106     private static long stdOfflinePubWaitMs = 2;
107     private static long stdPollMs = 2;
108     private static long stdInterPollMs = 2;
109     private static long stdEventWaitSec = 10;
110
111     // these are saved and restored on exit from this test class
112     private static PoolingFeature.Factory saveFeatureFactory;
113     private static PoolingManagerImpl.Factory saveManagerFactory;
114     private static DmaapManager.Factory saveDmaapFactory;
115
116     /**
117      * Context for the current test case.
118      */
119     private Context ctx;
120
121     @BeforeClass
122     public static void setUpBeforeClass() {
123         saveFeatureFactory = PoolingFeature.getFactory();
124         saveManagerFactory = PoolingManagerImpl.getFactory();
125         saveDmaapFactory = DmaapManager.getFactory();
126         
127         // note: invoke runSlow() to slow things down
128     }
129
130     @AfterClass
131     public static void tearDownAfterClass() {
132         PoolingFeature.setFactory(saveFeatureFactory);
133         PoolingManagerImpl.setFactory(saveManagerFactory);
134         DmaapManager.setFactory(saveDmaapFactory);
135     }
136
137     @Before
138     public void setUp() {
139         ctx = null;
140     }
141
142     @After
143     public void tearDown() {
144         if (ctx != null) {
145             ctx.destroy();
146         }
147     }
148
149     @Test
150     public void test_SingleHost() throws Exception {
151         run(70, 1);
152     }
153
154     @Test
155     public void test_TwoHosts() throws Exception {
156         run(200, 2);
157     }
158
159     @Test
160     public void test_ThreeHosts() throws Exception {
161         run(200, 3);
162     }
163
164     private void run(int nmessages, int nhosts) throws Exception {
165         ctx = new Context(nmessages);
166
167         for (int x = 0; x < nhosts; ++x) {
168             ctx.addHost();
169         }
170
171         ctx.startHosts();
172
173         for (int x = 0; x < nmessages; ++x) {
174             ctx.offerExternal(makeMessage(x));
175         }
176
177         ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
178
179         assertEquals(0, ctx.getDecodeErrors());
180         assertEquals(0, ctx.getRemainingEvents());
181         ctx.checkAllSawAMsg();
182     }
183
184     private String makeMessage(int reqnum) {
185         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
186     }
187     
188     /**
189      * Invoke this to slow the timers down.
190      */
191     protected static void runSlow() {
192          stdReactivateWaitMs = 10000;
193          stdIdentificationMs = 10000;
194          stdStartHeartbeatMs = 15000;
195          stdActiveHeartbeatMs = 12000;
196          stdInterHeartbeatMs = 5000;
197          stdOfflinePubWaitMs = 2;
198          stdPollMs = 2;
199          stdInterPollMs = 2000;
200          stdEventWaitSec = 1000;
201     }
202
203     /**
204      * Context used for a single test case.
205      */
206     private static class Context {
207
208         private final FeatureFactory featureFactory;
209         private final ManagerFactory managerFactory;
210         private final DmaapFactory dmaapFactory;
211
212         /**
213          * Hosts that have been added to this context.
214          */
215         private final Deque<Host> hosts = new LinkedList<>();
216
217         /**
218          * Maps a drools controller to its policy controller.
219          */
220         private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
221
222         /**
223          * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
224          */
225         private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
226
227         /**
228          * Queue for the external "DMaaP" topic.
229          */
230         private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
231
232         /**
233          * Counts the number of decode errors.
234          */
235         private final AtomicInteger nDecodeErrors = new AtomicInteger(0);
236
237         /**
238          * Number of events we're still waiting to receive.
239          */
240         private final CountDownLatch eventCounter;
241
242         /**
243          * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
244          * {@link #getCurrentHost()}.
245          */
246         private Host currentHost = null;
247
248         /**
249          * 
250          * @param nEvents number of events to be processed
251          */
252         public Context(int nEvents) {
253             featureFactory = new FeatureFactory(this);
254             managerFactory = new ManagerFactory(this);
255             dmaapFactory = new DmaapFactory(this);
256             eventCounter = new CountDownLatch(nEvents);
257
258             PoolingFeature.setFactory(featureFactory);
259             PoolingManagerImpl.setFactory(managerFactory);
260             DmaapManager.setFactory(dmaapFactory);
261         }
262
263         /**
264          * Destroys the context, stopping any hosts that remain.
265          */
266         public void destroy() {
267             stopHosts();
268             hosts.clear();
269         }
270
271         /**
272          * Creates and adds a new host to the context.
273          * 
274          * @return the new Host
275          */
276         public Host addHost() {
277             Host host = new Host(this);
278             hosts.add(host);
279
280             return host;
281         }
282
283         /**
284          * Starts the hosts.
285          */
286         public void startHosts() {
287             hosts.forEach(host -> host.start());
288         }
289
290         /**
291          * Stops the hosts.
292          */
293         public void stopHosts() {
294             hosts.forEach(host -> host.stop());
295         }
296
297         /**
298          * Verifies that all hosts processed at least one message.
299          */
300         public void checkAllSawAMsg() {
301             int x = 0;
302             for (Host host : hosts) {
303                 assertTrue("x=" + x, host.messageSeen());
304                 ++x;
305             }
306         }
307
308         /**
309          * Sets {@link #currentHost} to the specified host, and then invokes the given
310          * function. Resets {@link #currentHost} to {@code null} before returning.
311          * 
312          * @param host
313          * @param func function to invoke
314          */
315         public void withHost(Host host, VoidFunction func) {
316             currentHost = host;
317             func.apply();
318             currentHost = null;
319         }
320
321         /**
322          * Offers an event to the external topic.
323          * 
324          * @param event
325          */
326         public void offerExternal(String event) {
327             externalTopic.offer(event);
328         }
329
330         /**
331          * Adds an internal channel to the set of channels.
332          * 
333          * @param channel
334          * @param queue the channel's queue
335          */
336         public void addInternal(String channel, BlockingQueue<String> queue) {
337             channel2queue.put(channel, queue);
338         }
339
340         /**
341          * Offers a message to all internal channels.
342          * 
343          * @param message
344          */
345         public void offerInternal(String message) {
346             channel2queue.values().forEach(queue -> queue.offer(message));
347         }
348
349         /**
350          * Offers amessage to an internal channel.
351          * 
352          * @param channel
353          * @param message
354          */
355         public void offerInternal(String channel, String message) {
356             BlockingQueue<String> queue = channel2queue.get(channel);
357             if (queue != null) {
358                 queue.offer(message);
359             }
360         }
361
362         /**
363          * Decodes an event.
364          * 
365          * @param event
366          * @return the decoded event, or {@code null} if it cannot be decoded
367          */
368         public Object decodeEvent(String event) {
369             return managerFactory.decodeEvent(null, null, event);
370         }
371
372         /**
373          * Associates a controller with its drools controller.
374          * 
375          * @param controller
376          * @param droolsController
377          */
378         public void addController(PolicyController controller, DroolsController droolsController) {
379             drools2policy.put(droolsController, controller);
380         }
381
382         /**
383          * @param droolsController
384          * @return the controller associated with a drools controller, or {@code null} if
385          *         it has no associated controller
386          */
387         public PolicyController getController(DroolsController droolsController) {
388             return drools2policy.get(droolsController);
389         }
390
391         /**
392          * @return queue for the external topic
393          */
394         public BlockingQueue<String> getExternalTopic() {
395             return externalTopic;
396         }
397
398         /**
399          * 
400          * @return the number of decode errors so far
401          */
402         public int getDecodeErrors() {
403             return nDecodeErrors.get();
404         }
405
406         /**
407          * Increments the count of decode errors.
408          */
409         public void bumpDecodeErrors() {
410             nDecodeErrors.incrementAndGet();
411         }
412
413         /**
414          * 
415          * @return the number of events that haven't been processed
416          */
417         public long getRemainingEvents() {
418             return eventCounter.getCount();
419         }
420
421         /**
422          * Adds an event to the counter.
423          */
424         public void addEvent() {
425             eventCounter.countDown();
426         }
427
428         /**
429          * Waits, for a period of time, for all events to be processed.
430          * 
431          * @param time
432          * @param units
433          * @return {@code true} if all events have been processed, {@code false} otherwise
434          * @throws InterruptedException
435          */
436         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
437             return eventCounter.await(time, units);
438         }
439
440         /**
441          * Gets the current host, provided this is used from within a call to
442          * {@link #withHost(Host, VoidFunction)}.
443          * 
444          * @return the current host, or {@code null} if there is no current host
445          */
446         public Host getCurrentHost() {
447             return currentHost;
448         }
449     }
450
451     /**
452      * Simulates a single "host".
453      */
454     private static class Host {
455
456         private final Context context;
457
458         private final PoolingFeature feature = new PoolingFeature();
459
460         /**
461          * {@code True} if this host has processed a message, {@code false} otherwise.
462          */
463         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
464
465         /**
466          * This host's internal "DMaaP" topic.
467          */
468         private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
469
470         /**
471          * Source that reads from the external topic and posts to the listener.
472          */
473         private TopicSource externalSource;
474
475         // mock objects
476         private final PolicyEngine engine = mock(PolicyEngine.class);
477         private final ListenerController controller = mock(ListenerController.class);
478         private final DroolsController drools = mock(DroolsController.class);
479
480         /**
481          * 
482          * @param context
483          */
484         public Host(Context context) {
485             this.context = context;
486
487             when(controller.getName()).thenReturn(CONTROLLER1);
488             when(controller.getDrools()).thenReturn(drools);
489
490             // stop consuming events if the controller stops
491             when(controller.stop()).thenAnswer(args -> {
492                 externalSource.unregister(controller);
493                 return true;
494             });
495
496             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
497
498             context.addController(controller, drools);
499
500             // arrange to read from the external topic
501             externalSource = new TopicSourceImpl(context, false);
502         }
503
504         /**
505          * @return the host name
506          */
507         public String getName() {
508             return feature.getHost();
509         }
510
511         /**
512          * Starts threads for the host so that it begins consuming from both the external
513          * "DMaaP" topic and its own internal "DMaaP" topic.
514          */
515         public void start() {
516
517             context.withHost(this, () -> {
518
519                 feature.beforeStart(engine);
520                 feature.afterCreate(controller);
521
522                 // assign the queue for this host's internal topic
523                 context.addInternal(getName(), msgQueue);
524
525                 feature.beforeStart(controller);
526
527                 // start consuming events from the external topic
528                 externalSource.register(controller);
529
530                 feature.afterStart(controller);
531             });
532         }
533
534         /**
535          * Stops the host's threads.
536          */
537         public void stop() {
538             feature.beforeStop(controller);
539             externalSource.unregister(controller);
540             feature.afterStop(controller);
541         }
542
543         /**
544          * Offers an event to the feature, before the policy controller handles it.
545          * 
546          * @param protocol
547          * @param topic2
548          * @param event
549          * @return {@code true} if the event was handled, {@code false} otherwise
550          */
551         public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
552             return feature.beforeOffer(controller, protocol, topic2, event);
553         }
554
555         /**
556          * Offers an event to the feature, after the policy controller handles it.
557          * 
558          * @param protocol
559          * @param topic
560          * @param event
561          * @param success
562          * @return {@code true} if the event was handled, {@code false} otherwise
563          */
564         public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
565
566             return feature.afterOffer(controller, protocol, topic, event, success);
567         }
568
569         /**
570          * Offers an event to the feature, before the drools controller handles it.
571          * 
572          * @param fact
573          * @return {@code true} if the event was handled, {@code false} otherwise
574          */
575         public boolean beforeInsert(Object fact) {
576             return feature.beforeInsert(drools, fact);
577         }
578
579         /**
580          * Offers an event to the feature, after the drools controller handles it.
581          * 
582          * @param fact
583          * @param successInsert {@code true} if it was successfully inserted by the drools
584          *        controller, {@code false} otherwise
585          * @return {@code true} if the event was handled, {@code false} otherwise
586          */
587         public boolean afterInsert(Object fact, boolean successInsert) {
588             return feature.afterInsert(drools, fact, successInsert);
589         }
590
591         /**
592          * Indicates that a message was seen for this host.
593          */
594         public void sawMessage() {
595             sawMsg.set(true);
596         }
597
598         /**
599          * 
600          * @return {@code true} if a message was seen for this host, {@code false}
601          *         otherwise
602          */
603         public boolean messageSeen() {
604             return sawMsg.get();
605         }
606
607         /**
608          * @return the queue associated with this host's internal topic
609          */
610         public BlockingQueue<String> getInternalQueue() {
611             return msgQueue;
612         }
613     }
614
615     /**
616      * Listener for the external topic. Simulates the actions taken by
617      * <i>AggregatedPolicyController.onTopicEvent</i>.
618      */
619     private static class MyExternalTopicListener implements Answer<Void> {
620
621         private final Context context;
622         private final Host host;
623
624         public MyExternalTopicListener(Context context, Host host) {
625             this.context = context;
626             this.host = host;
627         }
628
629         @Override
630         public Void answer(InvocationOnMock args) throws Throwable {
631             int i = 0;
632             CommInfrastructure commType = args.getArgument(i++);
633             String topic = args.getArgument(i++);
634             String event = args.getArgument(i++);
635
636             if (host.beforeOffer(commType, topic, event)) {
637                 return null;
638             }
639
640             boolean result;
641             Object fact = context.decodeEvent(event);
642
643             if (fact == null) {
644                 result = false;
645                 context.bumpDecodeErrors();
646
647             } else {
648                 result = true;
649
650                 if (!host.beforeInsert(fact)) {
651                     // feature did not handle it so we handle it here
652                     host.afterInsert(fact, result);
653
654                     host.sawMessage();
655                     context.addEvent();
656                 }
657             }
658
659             host.afterOffer(commType, topic, event, result);
660             return null;
661         }
662     }
663
664     /**
665      * Sink implementation that puts a message on the queue specified by the
666      * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
667      * message is placed on all queues.
668      */
669     private static class TopicSinkImpl extends TopicImpl implements TopicSink {
670
671         private final Context context;
672
673         /**
674          * Used to decode the messages so that the channel can be extracted.
675          */
676         private final Serializer serializer = new Serializer();
677
678         /**
679          * 
680          * @param context
681          */
682         public TopicSinkImpl(Context context) {
683             this.context = context;
684         }
685
686         @Override
687         public synchronized boolean send(String message) {
688             if (!isAlive()) {
689                 return false;
690             }
691
692             try {
693                 Message msg = serializer.decodeMsg(message);
694                 String channel = msg.getChannel();
695
696                 if (Message.ADMIN.equals(channel)) {
697                     // add to every queue
698                     context.offerInternal(message);
699
700                 } else {
701                     // add to a specific queue
702                     context.offerInternal(channel, message);
703                 }
704
705                 return true;
706
707             } catch (IOException e) {
708                 logger.warn("could not decode message: {}", message);
709                 context.bumpDecodeErrors();
710                 return false;
711             }
712         }
713     }
714
715     /**
716      * Source implementation that reads from a queue associated with a topic.
717      */
718     private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
719
720         private final String topic;
721
722         /**
723          * Queue from which to retrieve messages.
724          */
725         private final BlockingQueue<String> queue;
726
727         /**
728          * Manages the current consumer thread. The "first" item is used as a trigger to
729          * tell the thread to stop processing, while the "second" item is triggered <i>by
730          * the thread</i> when it completes.
731          */
732         private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
733
734         /**
735          * 
736          * @param context
737          * @param internal {@code true} if to read from the internal topic, {@code false}
738          *        to read from the external topic
739          */
740         public TopicSourceImpl(Context context, boolean internal) {
741             if (internal) {
742                 this.topic = INTERNAL_TOPIC;
743                 this.queue = context.getCurrentHost().getInternalQueue();
744
745             } else {
746                 this.topic = EXTERNAL_TOPIC;
747                 this.queue = context.getExternalTopic();
748             }
749         }
750
751         @Override
752         public void setFilter(String filter) {
753             logger.info("topic filter set to: {}", filter);
754         }
755
756         @Override
757         public String getTopic() {
758             return topic;
759         }
760
761         @Override
762         public boolean offer(String event) {
763             throw new UnsupportedOperationException("offer topic source");
764         }
765
766         /**
767          * Starts a thread that takes messages from the queue and gives them to the
768          * listener. Stops the thread of any previously registered listener.
769          */
770         @Override
771         public void register(TopicListener listener) {
772             Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1));
773
774             reregister(newPair);
775
776             Thread thread = new Thread(() -> {
777
778                 try {
779                     do {
780                         processMessages(newPair.first(), listener);
781                     } while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
782
783                     logger.info("topic source thread completed");
784
785                 } catch (InterruptedException e) {
786                     logger.warn("topic source thread aborted", e);
787                     Thread.currentThread().interrupt();
788
789                 } catch (RuntimeException e) {
790                     logger.warn("topic source thread aborted", e);
791                 }
792
793                 newPair.second().countDown();
794
795             });
796
797             thread.setDaemon(true);
798             thread.start();
799         }
800
801         /**
802          * Stops the thread of <i>any</i> currently registered listener.
803          */
804         @Override
805         public void unregister(TopicListener listener) {
806             reregister(null);
807         }
808
809         /**
810          * Registers a new "pair" with this source, stopping the consumer associated with
811          * any previous registration.
812          * 
813          * @param newPair the new "pair", or {@code null} to unregister
814          */
815         private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
816             try {
817                 Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
818                 if (oldPair == null) {
819                     if (newPair == null) {
820                         // unregister was invoked twice in a row
821                         logger.warn("re-unregister for topic source");
822                     }
823
824                     // no previous thread to stop
825                     return;
826                 }
827
828                 // need to stop the previous thread
829
830                 // tell it to stop
831                 oldPair.first().countDown();
832
833                 // wait for it to stop
834                 if (!oldPair.second().await(2, TimeUnit.SECONDS)) {
835                     logger.warn("old topic registration is still running");
836                 }
837
838             } catch (InterruptedException e) {
839                 logger.warn("old topic registration may still be running", e);
840                 Thread.currentThread().interrupt();
841             }
842
843             if (newPair != null) {
844                 // register was invoked twice in a row
845                 logger.warn("re-register for topic source");
846             }
847         }
848
849         /**
850          * Polls for messages from the topic and offers them to the listener.
851          * 
852          * @param stopped triggered if processing should stop
853          * @param listener
854          * @throws InterruptedException
855          */
856         private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
857
858             for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
859
860                 String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
861                 if (msg == null) {
862                     return;
863                 }
864
865                 listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
866             }
867         }
868     }
869
870     /**
871      * Topic implementation. Most methods just throw
872      * {@link UnsupportedOperationException}.
873      */
874     private static class TopicImpl implements Topic {
875
876         /**
877          * {@code True} if this topic is alive/running, {@code false} otherwise.
878          */
879         private boolean alive = false;
880
881         /**
882          * 
883          */
884         public TopicImpl() {
885             super();
886         }
887
888         @Override
889         public String getTopic() {
890             return INTERNAL_TOPIC;
891         }
892
893         @Override
894         public CommInfrastructure getTopicCommInfrastructure() {
895             throw new UnsupportedOperationException("topic protocol");
896         }
897
898         @Override
899         public List<String> getServers() {
900             throw new UnsupportedOperationException("topic servers");
901         }
902
903         @Override
904         public String[] getRecentEvents() {
905             throw new UnsupportedOperationException("topic events");
906         }
907
908         @Override
909         public void register(TopicListener topicListener) {
910             throw new UnsupportedOperationException("register topic");
911         }
912
913         @Override
914         public void unregister(TopicListener topicListener) {
915             throw new UnsupportedOperationException("unregister topic");
916         }
917
918         @Override
919         public synchronized boolean start() {
920             if (alive) {
921                 throw new IllegalStateException("topic already started");
922             }
923
924             alive = true;
925             return true;
926         }
927
928         @Override
929         public synchronized boolean stop() {
930             if (!alive) {
931                 throw new IllegalStateException("topic is not running");
932             }
933
934             alive = false;
935             return true;
936         }
937
938         @Override
939         public synchronized void shutdown() {
940             alive = false;
941         }
942
943         @Override
944         public synchronized boolean isAlive() {
945             return alive;
946         }
947
948         @Override
949         public boolean lock() {
950             throw new UnsupportedOperationException("lock topicink");
951         }
952
953         @Override
954         public boolean unlock() {
955             throw new UnsupportedOperationException("unlock topic");
956         }
957
958         @Override
959         public boolean isLocked() {
960             throw new UnsupportedOperationException("topic isLocked");
961         }
962     }
963
964     /**
965      * Simulator for the feature-level factory.
966      */
967     private static class FeatureFactory extends PoolingFeature.Factory {
968
969         private final Context context;
970
971         /**
972          * 
973          * @param context
974          */
975         public FeatureFactory(Context context) {
976             this.context = context;
977
978             /*
979              * Note: do NOT extract anything from "context" at this point, because it
980              * hasn't been fully initialized yet
981              */
982         }
983
984         @Override
985         public Properties getProperties(String featName) {
986             Properties props = new Properties();
987
988             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
989
990             props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
991             props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
992             props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
993             props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
994             props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
995                             "" + stdOfflinePubWaitMs);
996             props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
997                             "" + stdStartHeartbeatMs);
998             props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
999             props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
1000             props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
1001                             "" + stdActiveHeartbeatMs);
1002             props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
1003                             "" + stdInterHeartbeatMs);
1004
1005             return props;
1006         }
1007
1008         @Override
1009         public PolicyController getController(DroolsController droolsController) {
1010             return context.getController(droolsController);
1011         }
1012     }
1013
1014     /**
1015      * Simulator for the pooling manager factory.
1016      */
1017     private static class ManagerFactory extends PoolingManagerImpl.Factory {
1018
1019         /**
1020          * Used to decode events from the external topic.
1021          */
1022         private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
1023             @Override
1024             protected ObjectMapper initialValue() {
1025                 return new ObjectMapper();
1026             }
1027         };
1028
1029         /**
1030          * Used to decode events into a Map.
1031          */
1032         private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {};
1033
1034         /**
1035          * 
1036          * @param context
1037          */
1038         public ManagerFactory(Context context) {
1039
1040             /*
1041              * Note: do NOT extract anything from "context" at this point, because it
1042              * hasn't been fully initialized yet
1043              */
1044         }
1045
1046         @Override
1047         public boolean canDecodeEvent(DroolsController drools, String topic) {
1048             return true;
1049         }
1050
1051         @Override
1052         public Object decodeEvent(DroolsController drools, String topic, String event) {
1053             try {
1054                 return mapper.get().readValue(event, typeRef);
1055
1056             } catch (IOException e) {
1057                 logger.warn("cannot decode external event", e);
1058                 return null;
1059             }
1060         }
1061     }
1062
1063     /**
1064      * Simulator for the dmaap manager factory.
1065      */
1066     private static class DmaapFactory extends DmaapManager.Factory {
1067
1068         private final Context context;
1069
1070         /**
1071          * 
1072          * @param context
1073          */
1074         public DmaapFactory(Context context) {
1075             this.context = context;
1076
1077             /*
1078              * Note: do NOT extract anything from "context" at this point, because it
1079              * hasn't been fully initialized yet
1080              */
1081         }
1082
1083         @Override
1084         public List<TopicSource> initTopicSources(Properties props) {
1085             return Arrays.asList(new TopicSourceImpl(context, true));
1086         }
1087
1088         @Override
1089         public List<TopicSink> initTopicSinks(Properties props) {
1090             return Arrays.asList(new TopicSinkImpl(context));
1091         }
1092     }
1093
1094     /**
1095      * Controller that also implements the {@link TopicListener} interface.
1096      */
1097     private static interface ListenerController extends PolicyController, TopicListener {
1098
1099     }
1100
1101     /**
1102      * Simple function that takes no arguments and returns nothing.
1103      */
1104     @FunctionalInterface
1105     private static interface VoidFunction {
1106
1107         public void apply();
1108     }
1109 }