Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / FeatureTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2020 Nordix Foundation
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling;
23
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.when;
30 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
31
32 import com.google.gson.Gson;
33 import com.google.gson.JsonParseException;
34 import java.util.Arrays;
35 import java.util.Deque;
36 import java.util.IdentityHashMap;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Properties;
40 import java.util.TreeMap;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentMap;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.LinkedBlockingQueue;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.concurrent.atomic.AtomicReference;
50 import lombok.Getter;
51 import org.apache.commons.lang3.tuple.Pair;
52 import org.junit.After;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.mockito.invocation.InvocationOnMock;
56 import org.mockito.stubbing.Answer;
57 import org.onap.policy.common.endpoints.event.comm.Topic;
58 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
59 import org.onap.policy.common.endpoints.event.comm.TopicListener;
60 import org.onap.policy.common.endpoints.event.comm.TopicSink;
61 import org.onap.policy.common.endpoints.event.comm.TopicSource;
62 import org.onap.policy.drools.controller.DroolsController;
63 import org.onap.policy.drools.system.PolicyController;
64 import org.onap.policy.drools.system.PolicyEngine;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 /**
69  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
70  * its own feature object. Uses real feature objects. However, the following are not:
71  * <dl>
72  * <dt>DMaaP sources and sinks</dt>
73  * <dd>simulated using queues. There is one queue for the external topic, and one queue
74  * for each host's internal topic. Messages published to the "admin" channel are simply
75  * sent to all of the hosts' internal topic queues</dd>
76  * <dt>PolicyEngine, PolicyController, DroolsController</dt>
77  * <dd>mocked</dd>
78  * </dl>
79  *
80  * <p>Invoke {@link #runSlow()}, before the test, to slow things down.
81  */
82
83 public class FeatureTest {
84     private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
85     /**
86      * Name of the topic used for inter-host communication.
87      */
88     private static final String INTERNAL_TOPIC = "my.internal.topic";
89     /**
90      * Name of the topic from which "external" events "arrive".
91      */
92     private static final String EXTERNAL_TOPIC = "my.external.topic";
93     /**
94      * Name of the controller.
95      */
96     private static final String CONTROLLER1 = "controller.one";
97     private static long stdReactivateWaitMs = 200;
98     private static long stdIdentificationMs = 60;
99     private static long stdStartHeartbeatMs = 60;
100     private static long stdActiveHeartbeatMs = 50;
101     private static long stdInterHeartbeatMs = 5;
102     private static long stdOfflinePubWaitMs = 2;
103     private static long stdPollMs = 2;
104     private static long stdInterPollMs = 2;
105     private static long stdEventWaitSec = 10;
106     /**
107      * Used to decode events from the external topic.
108      */
109     private static final Gson mapper = new Gson();
110     /**
111      * Used to identify the current context.
112      */
113     private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>();
114     /**
115      * Context for the current test case.
116      */
117     private Context ctx;
118     /**
119      * Setup.
120      */
121
122     @Before
123     public void setUp() {
124         ctx = null;
125     }
126     /**
127      * Tear down.
128      */
129
130     @After
131     public void tearDown() {
132         if (ctx != null) {
133             ctx.destroy();
134         }
135     }
136
137     @Test
138     public void test_SingleHost() throws Exception {
139         run(70, 1);
140     }
141
142     @Test
143     public void test_TwoHosts() throws Exception {
144         run(200, 2);
145     }
146
147     @Test
148     public void test_ThreeHosts() throws Exception {
149         run(200, 3);
150     }
151
152     private void run(int nmessages, int nhosts) throws Exception {
153         ctx = new Context(nmessages);
154         for (int x = 0; x < nhosts; ++x) {
155             ctx.addHost();
156         }
157         ctx.startHosts();
158         for (int x = 0; x < nmessages; ++x) {
159             ctx.offerExternal(makeMessage(x));
160         }
161         ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
162         assertEquals(0, ctx.getDecodeErrors());
163         assertEquals(0, ctx.getRemainingEvents());
164         ctx.checkAllSawAMsg();
165     }
166
167     private String makeMessage(int reqnum) {
168         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
169     }
170     /**
171      * Invoke this to slow the timers down.
172      */
173
174     protected static void runSlow() {
175         stdReactivateWaitMs = 10000;
176         stdIdentificationMs = 10000;
177         stdStartHeartbeatMs = 15000;
178         stdActiveHeartbeatMs = 12000;
179         stdInterHeartbeatMs = 5000;
180         stdOfflinePubWaitMs = 2;
181         stdPollMs = 2;
182         stdInterPollMs = 2000;
183         stdEventWaitSec = 1000;
184     }
185     /**
186      * Decodes an event.
187      *
188      * @param event event
189      * @return the decoded event, or {@code null} if it cannot be decoded
190      */
191
192     private static Object decodeEvent(String event) {
193         try {
194             return mapper.fromJson(event, TreeMap.class);
195         } catch (JsonParseException e) {
196             logger.warn("cannot decode external event", e);
197             return null;
198         }
199     }
200     /**
201      * Context used for a single test case.
202      */
203
204     private static class Context {
205         /**
206          * Hosts that have been added to this context.
207          */
208         private final Deque<Host> hosts = new LinkedList<>();
209         /**
210          * Maps a drools controller to its policy controller.
211          */
212         private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
213         /**
214          * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
215          */
216         private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
217         /**
218          * Counts the number of decode errors.
219          */
220         private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
221         /**
222          * Number of events we're still waiting to receive.
223          */
224         private final CountDownLatch eventCounter;
225         /**
226          * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
227          * {@link #getCurrentHost()}.
228          */
229         private Host currentHost = null;
230
231         /**
232          * Constructor.
233          *
234          * @param events number of events to be processed
235          */
236
237         public Context(int events) {
238             eventCounter = new CountDownLatch(events);
239         }
240         /**
241          * Destroys the context, stopping any hosts that remain.
242          */
243
244         public void destroy() {
245             stopHosts();
246             hosts.clear();
247         }
248
249         /**
250          * Creates and adds a new host to the context.
251          *
252          * @return the new Host
253          */
254
255         public Host addHost() {
256             Host host = new Host(this);
257             hosts.add(host);
258             return host;
259         }
260
261         /**
262          * Starts the hosts.
263          */
264
265         public void startHosts() {
266             hosts.forEach(host -> host.start());
267         }
268
269         /**
270          * Stops the hosts.
271          */
272
273         public void stopHosts() {
274             hosts.forEach(host -> host.stop());
275         }
276
277         /**
278          * Verifies that all hosts processed at least one message.
279          */
280
281         public void checkAllSawAMsg() {
282             int msgs = 0;
283             for (Host host : hosts) {
284                 assertTrue("msgs=" + msgs, host.messageSeen());
285                 ++msgs;
286             }
287         }
288
289         /**
290          * Sets {@link #currentHost} to the specified host, and then invokes the given
291          * function. Resets {@link #currentHost} to {@code null} before returning.
292          *
293          * @param host host
294          * @param func function to invoke
295          */
296
297         public void withHost(Host host, VoidFunction func) {
298             currentHost = host;
299             func.apply();
300             currentHost = null;
301         }
302
303         /**
304          * Offers an event to the external topic. As each host needs a copy, it is posted
305          * to each Host's queue.
306          *
307          * @param event event
308          */
309
310         public void offerExternal(String event) {
311             for (Host host : hosts) {
312                 host.getExternalTopic().offer(event);
313             }
314         }
315
316         /**
317          * Adds an internal channel to the set of channels.
318          *
319          * @param channel channel
320          * @param queue the channel's queue
321          */
322
323         public void addInternal(String channel, BlockingQueue<String> queue) {
324             channel2queue.put(channel, queue);
325         }
326
327         /**
328          * Offers a message to all internal channels.
329          *
330          * @param message message
331          */
332
333         public void offerInternal(String message) {
334             channel2queue.values().forEach(queue -> queue.offer(message));
335         }
336
337         /**
338          * Associates a controller with its drools controller.
339          *
340          * @param controller controller
341          * @param droolsController drools controller
342          */
343
344         public void addController(PolicyController controller, DroolsController droolsController) {
345             drools2policy.put(droolsController, controller);
346         }
347
348         /**
349          * Get controller.
350          *
351          * @param droolsController drools controller
352          * @return the controller associated with a drools controller, or {@code null} if
353          *         it has no associated controller
354          */
355
356         public PolicyController getController(DroolsController droolsController) {
357             return drools2policy.get(droolsController);
358         }
359
360         /**
361          * Get decode errors.
362          *
363          * @return the number of decode errors so far
364          */
365
366         public int getDecodeErrors() {
367             return numDecodeErrors.get();
368         }
369
370         /**
371          * Increments the count of decode errors.
372          */
373
374         public void bumpDecodeErrors() {
375             numDecodeErrors.incrementAndGet();
376         }
377
378         /**
379          * Get remaining events.
380          *
381          * @return the number of events that haven't been processed
382          */
383
384         public long getRemainingEvents() {
385             return eventCounter.getCount();
386         }
387
388         /**
389          * Adds an event to the counter.
390          */
391
392         public void addEvent() {
393             eventCounter.countDown();
394         }
395
396         /**
397          * Waits, for a period of time, for all events to be processed.
398          *
399          * @param time time
400          * @param units units
401          * @return {@code true} if all events have been processed, {@code false} otherwise
402          * @throws InterruptedException throws interrupted
403          */
404
405         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
406             return eventCounter.await(time, units);
407         }
408
409         /**
410          * Gets the current host, provided this is used from within a call to
411          * {@link #withHost(Host, VoidFunction)}.
412          *
413          * @return the current host, or {@code null} if there is no current host
414          */
415
416         public Host getCurrentHost() {
417             return currentHost;
418         }
419     }
420
421     /**
422      * Simulates a single "host".
423      */
424
425     private static class Host {
426         private final Context context;
427         private final PoolingFeature feature;
428
429         /**
430          * {@code True} if this host has processed a message, {@code false} otherwise.
431          */
432
433         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
434
435         /**
436          * This host's internal "DMaaP" topic.
437          */
438
439         private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
440
441         /**
442          * Queue for the external "DMaaP" topic.
443          */
444         @Getter
445         private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
446
447         /**
448          * Source that reads from the external topic and posts to the listener.
449          */
450
451         private TopicSource externalSource;
452
453         // mock objects
454         private final PolicyEngine engine = mock(PolicyEngine.class);
455         private final ListenerController controller = mock(ListenerController.class);
456         private final DroolsController drools = mock(DroolsController.class);
457
458         /**
459          * Constructor.
460          *
461          * @param context context
462          */
463
464         public Host(Context context) {
465             this.context = context;
466             when(controller.getName()).thenReturn(CONTROLLER1);
467             when(controller.getDrools()).thenReturn(drools);
468             // stop consuming events if the controller stops
469             when(controller.stop()).thenAnswer(args -> {
470                 externalSource.unregister(controller);
471                 return true;
472             });
473             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
474             context.addController(controller, drools);
475             // arrange to read from the external topic
476             externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
477             feature = new PoolingFeatureImpl(context);
478         }
479
480         /**
481          * Get name.
482          *
483          * @return the host name
484          */
485
486         public String getName() {
487             return feature.getHost();
488         }
489
490         /**
491          * Starts threads for the host so that it begins consuming from both the external
492          * "DMaaP" topic and its own internal "DMaaP" topic.
493          */
494
495         public void start() {
496             context.withHost(this, () -> {
497                 feature.beforeStart(engine);
498                 feature.afterCreate(controller);
499                 // assign the queue for this host's internal topic
500                 context.addInternal(getName(), msgQueue);
501                 feature.beforeStart(controller);
502                 // start consuming events from the external topic
503                 externalSource.register(controller);
504                 feature.afterStart(controller);
505             });
506         }
507
508         /**
509          * Stops the host's threads.
510          */
511
512         public void stop() {
513             feature.beforeStop(controller);
514             externalSource.unregister(controller);
515             feature.afterStop(controller);
516         }
517
518         /**
519          * Offers an event to the feature, before the policy controller handles it.
520          *
521          * @param protocol protocol
522          * @param topic2 topic
523          * @param event event
524          * @return {@code true} if the event was handled, {@code false} otherwise
525          */
526
527         public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
528             return feature.beforeOffer(controller, protocol, topic2, event);
529         }
530
531         /**
532          * Offers an event to the feature, after the policy controller handles it.
533          *
534          * @param protocol protocol
535          * @param topic topic
536          * @param event event
537          * @param success success
538          * @return {@code true} if the event was handled, {@code false} otherwise
539          */
540
541         public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
542             return feature.afterOffer(controller, protocol, topic, event, success);
543         }
544
545         /**
546          * Offers an event to the feature, before the drools controller handles it.
547          *
548          * @param fact fact
549          * @return {@code true} if the event was handled, {@code false} otherwise
550          */
551
552         public boolean beforeInsert(Object fact) {
553             return feature.beforeInsert(drools, fact);
554         }
555
556         /**
557          * Offers an event to the feature, after the drools controller handles it.
558          *
559          * @param fact fact
560          * @param successInsert {@code true} if it was successfully inserted by the drools
561          *        controller, {@code false} otherwise
562          * @return {@code true} if the event was handled, {@code false} otherwise
563          */
564
565         public boolean afterInsert(Object fact, boolean successInsert) {
566             return feature.afterInsert(drools, fact, successInsert);
567         }
568
569         /**
570          * Indicates that a message was seen for this host.
571          */
572
573         public void sawMessage() {
574             sawMsg.set(true);
575         }
576
577         /**
578          * Message seen.
579          *
580          * @return {@code true} if a message was seen for this host, {@code false}
581          *         otherwise
582          */
583
584         public boolean messageSeen() {
585             return sawMsg.get();
586         }
587
588         /**
589          * Get internal queue.
590          *
591          * @return the queue associated with this host's internal topic
592          */
593
594         public BlockingQueue<String> getInternalQueue() {
595             return msgQueue;
596         }
597     }
598
599     /**
600      * Listener for the external topic. Simulates the actions taken by
601      * <i>AggregatedPolicyController.onTopicEvent</i>.
602      */
603
604     private static class MyExternalTopicListener implements Answer<Void> {
605         private final Context context;
606         private final Host host;
607
608         public MyExternalTopicListener(Context context, Host host) {
609             this.context = context;
610             this.host = host;
611         }
612
613         @Override
614         public Void answer(InvocationOnMock args) throws Throwable {
615             int index = 0;
616             CommInfrastructure commType = args.getArgument(index++);
617             String topic = args.getArgument(index++);
618             String event = args.getArgument(index++);
619             if (host.beforeOffer(commType, topic, event)) {
620                 return null;
621             }
622             boolean result;
623             Object fact = decodeEvent(event);
624             if (fact == null) {
625                 result = false;
626                 context.bumpDecodeErrors();
627             } else {
628                 result = true;
629                 if (!host.beforeInsert(fact)) {
630                     // feature did not handle it so we handle it here
631                     host.afterInsert(fact, result);
632                     host.sawMessage();
633                     context.addEvent();
634                 }
635             }
636             host.afterOffer(commType, topic, event, result);
637             return null;
638         }
639     }
640
641     /**
642      * Sink implementation that puts a message on the queue specified by the
643      * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
644      * message is placed on all queues.
645      */
646
647     private static class TopicSinkImpl extends TopicImpl implements TopicSink {
648         private final Context context;
649
650         /**
651          * Constructor.
652          *
653          * @param context context
654          */
655
656         public TopicSinkImpl(Context context) {
657             this.context = context;
658         }
659
660         @Override
661         public synchronized boolean send(String message) {
662             if (!isAlive()) {
663                 return false;
664             }
665             try {
666                 context.offerInternal(message);
667                 return true;
668             } catch (JsonParseException e) {
669                 logger.warn("could not decode message: {}", message);
670                 context.bumpDecodeErrors();
671                 return false;
672             }
673         }
674     }
675
676     /**
677      * Source implementation that reads from a queue associated with a topic.
678      */
679
680     private static class TopicSourceImpl extends TopicImpl implements TopicSource {
681
682         private final String topic;
683         /**
684          * Queue from which to retrieve messages.
685          */
686         private final BlockingQueue<String> queue;
687         /**
688          * Manages the current consumer thread. The "first" item is used as a trigger to
689          * tell the thread to stop processing, while the "second" item is triggered <i>by
690          * the thread</i> when it completes.
691          */
692         private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
693
694         /**
695          * Constructor.
696          *
697          * @param type topic type
698          * @param queue topic from which to read
699          */
700
701         public TopicSourceImpl(String type, BlockingQueue<String> queue) {
702             this.topic = type;
703             this.queue = queue;
704         }
705
706         @Override
707         public String getTopic() {
708             return topic;
709         }
710
711         @Override
712         public boolean offer(String event) {
713             throw new UnsupportedOperationException("offer topic source");
714         }
715
716         /**
717          * Starts a thread that takes messages from the queue and gives them to the
718          * listener. Stops the thread of any previously registered listener.
719          */
720
721         @Override
722         public void register(TopicListener listener) {
723             Pair<CountDownLatch, CountDownLatch> newPair = Pair.of(new CountDownLatch(1), new CountDownLatch(1));
724             reregister(newPair);
725             Thread thread = new Thread(() -> {
726                 try {
727                     do {
728                         processMessages(newPair.getLeft(), listener);
729                     } while (!newPair.getLeft().await(stdInterPollMs, TimeUnit.MILLISECONDS));
730                     logger.info("topic source thread completed");
731                 } catch (InterruptedException e) {
732                     logger.warn("topic source thread aborted", e);
733                     Thread.currentThread().interrupt();
734                 } catch (RuntimeException e) {
735                     logger.warn("topic source thread aborted", e);
736                 }
737                 newPair.getRight().countDown();
738             });
739             thread.setDaemon(true);
740             thread.start();
741         }
742
743         /**
744          * Stops the thread of <i>any</i> currently registered listener.
745          */
746
747         @Override
748         public void unregister(TopicListener listener) {
749             reregister(null);
750         }
751
752         /**
753          * Registers a new "pair" with this source, stopping the consumer associated with
754          * any previous registration.
755          *
756          * @param newPair the new "pair", or {@code null} to unregister
757          */
758
759         private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
760             try {
761                 Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
762                 if (oldPair == null) {
763                     if (newPair == null) {
764                         // unregister was invoked twice in a row
765                         logger.warn("re-unregister for topic source");
766                     }
767                     // no previous thread to stop
768                     return;
769                 }
770                 // need to stop the previous thread
771                 // tell it to stop
772                 oldPair.getLeft().countDown();
773                 // wait for it to stop
774                 if (!oldPair.getRight().await(2, TimeUnit.SECONDS)) {
775                     logger.warn("old topic registration is still running");
776                 }
777             } catch (InterruptedException e) {
778                 logger.warn("old topic registration may still be running", e);
779                 Thread.currentThread().interrupt();
780             }
781             if (newPair != null) {
782                 // register was invoked twice in a row
783                 logger.warn("re-register for topic source");
784             }
785         }
786
787         /**
788          * Polls for messages from the topic and offers them to the listener.
789          *
790          * @param stopped triggered if processing should stop
791          * @param listener listener
792          * @throws InterruptedException throws interrupted exception
793          */
794
795         private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
796             for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
797                 String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
798                 if (msg == null) {
799                     return;
800                 }
801                 listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
802             }
803         }
804     }
805
806     /**
807      * Topic implementation. Most methods just throw
808      * {@link UnsupportedOperationException}.
809      */
810
811     private static class TopicImpl implements Topic {
812
813         /**
814          * Constructor.
815          */
816
817         public TopicImpl() {
818             super();
819         }
820
821         @Override
822         public String getTopic() {
823             return INTERNAL_TOPIC;
824         }
825
826         @Override
827         public String getEffectiveTopic() {
828             return INTERNAL_TOPIC;
829         }
830
831         @Override
832         public CommInfrastructure getTopicCommInfrastructure() {
833             throw new UnsupportedOperationException("topic protocol");
834         }
835
836         @Override
837         public List<String> getServers() {
838             throw new UnsupportedOperationException("topic servers");
839         }
840
841         @Override
842         public String[] getRecentEvents() {
843             throw new UnsupportedOperationException("topic events");
844         }
845
846         @Override
847         public void register(TopicListener topicListener) {
848             throw new UnsupportedOperationException("register topic");
849         }
850
851         @Override
852         public void unregister(TopicListener topicListener) {
853             throw new UnsupportedOperationException("unregister topic");
854         }
855
856         @Override
857         public synchronized boolean start() {
858             return true;
859         }
860
861         @Override
862         public synchronized boolean stop() {
863             return true;
864         }
865
866         @Override
867         public synchronized void shutdown() {
868             // do nothing
869         }
870
871         @Override
872         public synchronized boolean isAlive() {
873             return true;
874         }
875
876         @Override
877         public boolean lock() {
878             throw new UnsupportedOperationException("lock topicink");
879         }
880
881         @Override
882         public boolean unlock() {
883             throw new UnsupportedOperationException("unlock topic");
884         }
885
886         @Override
887         public boolean isLocked() {
888             throw new UnsupportedOperationException("topic isLocked");
889         }
890     }
891
892     /**
893      * Feature with overrides.
894      */
895
896     private static class PoolingFeatureImpl extends PoolingFeature {
897         private final Context context;
898
899         /**
900          * Constructor.
901          *
902          * @param context context
903          */
904
905         public PoolingFeatureImpl(Context context) {
906             this.context = context;
907             /*
908              * Note: do NOT extract anything from "context" at this point, because it
909              * hasn't been fully initialized yet
910              */
911         }
912
913         @Override
914         public Properties getProperties(String featName) {
915             Properties props = new Properties();
916             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
917             props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
918             props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
919             props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
920             props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
921             props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
922                             "" + stdOfflinePubWaitMs);
923             props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
924                             "" + stdStartHeartbeatMs);
925             props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
926             props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
927             props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
928                             "" + stdActiveHeartbeatMs);
929             props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
930                             "" + stdInterHeartbeatMs);
931             return props;
932         }
933
934         @Override
935         public PolicyController getController(DroolsController droolsController) {
936             return context.getController(droolsController);
937         }
938
939         /**
940          * Embeds a specializer within a property name, after the prefix.
941          *
942          * @param propnm property name into which it should be embedded
943          * @param spec specializer to be embedded
944          * @return the property name, with the specializer embedded within it
945          */
946
947         private String specialize(String propnm, String spec) {
948             String suffix = propnm.substring(PREFIX.length());
949             return PREFIX + spec + "." + suffix;
950         }
951
952         @Override
953         protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
954                         CountDownLatch activeLatch) {
955             currentContext.set(context);
956             return new PoolingManagerTest(host, controller, props, activeLatch);
957         }
958     }
959
960     /**
961      * Pooling Manager with overrides.
962      */
963
964     private static class PoolingManagerTest extends PoolingManagerImpl {
965
966         /**
967          * Constructor.
968          *
969          * @param host the host
970          * @param controller the controller
971          * @param props the properties
972          * @param activeLatch the latch
973          */
974
975         public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
976                         CountDownLatch activeLatch) {
977             super(host, controller, props, activeLatch);
978         }
979
980         @Override
981         protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
982             return new DmaapManagerImpl(topic);
983         }
984
985         @Override
986         protected boolean canDecodeEvent(DroolsController drools, String topic) {
987             return true;
988         }
989
990         @Override
991         protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
992             return decodeEvent(event);
993         }
994     }
995
996     /**
997      * DMaaP Manager with overrides.
998      */
999
1000     private static class DmaapManagerImpl extends DmaapManager {
1001
1002         /**
1003          * Constructor.
1004          *
1005          * @param topic the topic
1006          * @throws PoolingFeatureException if an error occurs
1007          */
1008
1009         public DmaapManagerImpl(String topic) throws PoolingFeatureException {
1010             super(topic);
1011         }
1012
1013         @Override
1014         protected List<TopicSource> getTopicSources() {
1015             return Arrays.asList(new TopicSourceImpl(INTERNAL_TOPIC,
1016                             currentContext.get().getCurrentHost().getInternalQueue()));
1017         }
1018
1019         @Override
1020         protected List<TopicSink> getTopicSinks() {
1021             return Arrays.asList(new TopicSinkImpl(currentContext.get()));
1022         }
1023     }
1024
1025     /**
1026      * Controller that also implements the {@link TopicListener} interface.
1027      */
1028
1029     private static interface ListenerController extends PolicyController, TopicListener {
1030     }
1031
1032     /**
1033      * Simple function that takes no arguments and returns nothing.
1034      */
1035
1036     @FunctionalInterface
1037     private static interface VoidFunction {
1038         void apply();
1039     }
1040 }