Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / test / java / org / onap / policy / drools / pooling / FeatureTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2020, 2024 Nordix Foundation
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling;
23
24 import static org.junit.jupiter.api.Assertions.assertEquals;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.when;
30 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
31
32 import com.google.gson.Gson;
33 import com.google.gson.JsonParseException;
34 import java.util.Deque;
35 import java.util.IdentityHashMap;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.TreeMap;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.LinkedBlockingQueue;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicInteger;
48 import java.util.concurrent.atomic.AtomicReference;
49 import lombok.Getter;
50 import org.apache.commons.lang3.tuple.Pair;
51 import org.junit.jupiter.api.AfterEach;
52 import org.junit.jupiter.api.BeforeEach;
53 import org.junit.jupiter.api.Test;
54 import org.mockito.invocation.InvocationOnMock;
55 import org.mockito.stubbing.Answer;
56 import org.onap.policy.common.endpoints.event.comm.Topic;
57 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
58 import org.onap.policy.common.endpoints.event.comm.TopicListener;
59 import org.onap.policy.common.endpoints.event.comm.TopicSink;
60 import org.onap.policy.common.endpoints.event.comm.TopicSource;
61 import org.onap.policy.drools.controller.DroolsController;
62 import org.onap.policy.drools.system.PolicyController;
63 import org.onap.policy.drools.system.PolicyEngine;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 /**
68  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
69  * its own feature object. Uses real feature objects. However, the following are not:
70  * <dl>
71  * <dt>DMaaP sources and sinks</dt>
72  * <dd>simulated using queues. There is one queue for the external topic, and one queue
73  * for each host's internal topic. Messages published to the "admin" channel are simply
74  * sent to all of the hosts' internal topic queues</dd>
75  * <dt>PolicyEngine, PolicyController, DroolsController</dt>
76  * <dd>mocked</dd>
77  * </dl>
78  *
79  * <p>Invoke {@link #runSlow()}, before the test, to slow things down.
80  */
81
82 class FeatureTest {
83     private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
84     /**
85      * Name of the topic used for inter-host communication.
86      */
87     private static final String INTERNAL_TOPIC = "my.internal.topic";
88     /**
89      * Name of the topic from which "external" events "arrive".
90      */
91     private static final String EXTERNAL_TOPIC = "my.external.topic";
92     /**
93      * Name of the controller.
94      */
95     private static final String CONTROLLER1 = "controller.one";
96     private static long stdReactivateWaitMs = 200;
97     private static long stdIdentificationMs = 60;
98     private static long stdStartHeartbeatMs = 60;
99     private static long stdActiveHeartbeatMs = 50;
100     private static long stdInterHeartbeatMs = 5;
101     private static long stdOfflinePubWaitMs = 2;
102     private static long stdPollMs = 2;
103     private static long stdInterPollMs = 2;
104     private static long stdEventWaitSec = 10;
105     /**
106      * Used to decode events from the external topic.
107      */
108     private static final Gson mapper = new Gson();
109     /**
110      * Used to identify the current context.
111      */
112     private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>();
113     /**
114      * Context for the current test case.
115      */
116     private Context ctx;
117
118     /**
119      * Setup.
120      */
121
122     @BeforeEach
123     public void setUp() {
124         ctx = null;
125     }
126
127     /**
128      * Tear down.
129      */
130
131     @AfterEach
132     public void tearDown() {
133         if (ctx != null) {
134             ctx.destroy();
135         }
136     }
137
138     @Test
139     void test_SingleHost() throws Exception {
140         run(70, 1);
141     }
142
143     @Test
144     void test_TwoHosts() throws Exception {
145         run(200, 2);
146     }
147
148     @Test
149     void test_ThreeHosts() throws Exception {
150         run(200, 3);
151     }
152
153     private void run(int nmessages, int nhosts) throws Exception {
154         ctx = new Context(nmessages);
155         for (int x = 0; x < nhosts; ++x) {
156             ctx.addHost();
157         }
158         ctx.startHosts();
159         for (int x = 0; x < nmessages; ++x) {
160             ctx.offerExternal(makeMessage(x));
161         }
162         ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
163         assertEquals(0, ctx.getDecodeErrors());
164         assertEquals(0, ctx.getRemainingEvents());
165         ctx.checkAllSawAMsg();
166     }
167
168     private String makeMessage(int reqnum) {
169         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
170     }
171
172     /**
173      * Invoke this to slow the timers down.
174      */
175
176     protected static void runSlow() {
177         stdReactivateWaitMs = 10000;
178         stdIdentificationMs = 10000;
179         stdStartHeartbeatMs = 15000;
180         stdActiveHeartbeatMs = 12000;
181         stdInterHeartbeatMs = 5000;
182         stdOfflinePubWaitMs = 2;
183         stdPollMs = 2;
184         stdInterPollMs = 2000;
185         stdEventWaitSec = 1000;
186     }
187
188     /**
189      * Decodes an event.
190      *
191      * @param event event
192      * @return the decoded event, or {@code null} if it cannot be decoded
193      */
194
195     private static Object decodeEvent(String event) {
196         try {
197             return mapper.fromJson(event, TreeMap.class);
198         } catch (JsonParseException e) {
199             logger.warn("cannot decode external event", e);
200             return null;
201         }
202     }
203
204     /**
205      * Context used for a single test case.
206      */
207
208     private static class Context {
209         /**
210          * Hosts that have been added to this context.
211          */
212         private final Deque<Host> hosts = new LinkedList<>();
213         /**
214          * Maps a drools controller to its policy controller.
215          */
216         private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
217         /**
218          * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
219          */
220         private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
221         /**
222          * Counts the number of decode errors.
223          */
224         private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
225         /**
226          * Number of events we're still waiting to receive.
227          */
228         private final CountDownLatch eventCounter;
229
230         /**
231          * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
232          * {@link #getCurrentHost()}.
233          */
234         @Getter
235         private Host currentHost = null;
236
237         /**
238          * Constructor.
239          *
240          * @param events number of events to be processed
241          */
242
243         public Context(int events) {
244             eventCounter = new CountDownLatch(events);
245         }
246
247         /**
248          * Destroys the context, stopping any hosts that remain.
249          */
250
251         public void destroy() {
252             stopHosts();
253             hosts.clear();
254         }
255
256         /**
257          * Creates and adds a new host to the context.
258          *
259          * @return the new Host
260          */
261
262         public Host addHost() {
263             Host host = new Host(this);
264             hosts.add(host);
265             return host;
266         }
267
268         /**
269          * Starts the hosts.
270          */
271
272         public void startHosts() {
273             hosts.forEach(host -> host.start());
274         }
275
276         /**
277          * Stops the hosts.
278          */
279
280         public void stopHosts() {
281             hosts.forEach(host -> host.stop());
282         }
283
284         /**
285          * Verifies that all hosts processed at least one message.
286          */
287
288         public void checkAllSawAMsg() {
289             int msgs = 0;
290             for (Host host : hosts) {
291                 assertTrue(host.messageSeen(), "msgs=" + msgs);
292                 ++msgs;
293             }
294         }
295
296         /**
297          * Sets {@link #currentHost} to the specified host, and then invokes the given
298          * function. Resets {@link #currentHost} to {@code null} before returning.
299          *
300          * @param host host
301          * @param func function to invoke
302          */
303
304         public void withHost(Host host, VoidFunction func) {
305             currentHost = host;
306             func.apply();
307             currentHost = null;
308         }
309
310         /**
311          * Offers an event to the external topic. As each host needs a copy, it is posted
312          * to each Host's queue.
313          *
314          * @param event event
315          */
316
317         public void offerExternal(String event) {
318             for (Host host : hosts) {
319                 host.getExternalTopic().offer(event);
320             }
321         }
322
323         /**
324          * Adds an internal channel to the set of channels.
325          *
326          * @param channel channel
327          * @param queue   the channel's queue
328          */
329
330         public void addInternal(String channel, BlockingQueue<String> queue) {
331             channel2queue.put(channel, queue);
332         }
333
334         /**
335          * Offers a message to all internal channels.
336          *
337          * @param message message
338          */
339
340         public void offerInternal(String message) {
341             channel2queue.values().forEach(queue -> queue.offer(message));
342         }
343
344         /**
345          * Associates a controller with its drools controller.
346          *
347          * @param controller       controller
348          * @param droolsController drools controller
349          */
350
351         public void addController(PolicyController controller, DroolsController droolsController) {
352             drools2policy.put(droolsController, controller);
353         }
354
355         /**
356          * Get controller.
357          *
358          * @param droolsController drools controller
359          * @return the controller associated with a drools controller, or {@code null} if
360          *         it has no associated controller
361          */
362
363         public PolicyController getController(DroolsController droolsController) {
364             return drools2policy.get(droolsController);
365         }
366
367         /**
368          * Get decode errors.
369          *
370          * @return the number of decode errors so far
371          */
372
373         public int getDecodeErrors() {
374             return numDecodeErrors.get();
375         }
376
377         /**
378          * Increments the count of decode errors.
379          */
380
381         public void bumpDecodeErrors() {
382             numDecodeErrors.incrementAndGet();
383         }
384
385         /**
386          * Get remaining events.
387          *
388          * @return the number of events that haven't been processed
389          */
390
391         public long getRemainingEvents() {
392             return eventCounter.getCount();
393         }
394
395         /**
396          * Adds an event to the counter.
397          */
398
399         public void addEvent() {
400             eventCounter.countDown();
401         }
402
403         /**
404          * Waits, for a period of time, for all events to be processed.
405          *
406          * @param time  time
407          * @param units units
408          * @return {@code true} if all events have been processed, {@code false} otherwise
409          * @throws InterruptedException throws interrupted
410          */
411
412         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
413             return eventCounter.await(time, units);
414         }
415
416     }
417
418     /**
419      * Simulates a single "host".
420      */
421
422     private static class Host {
423         private final Context context;
424         private final PoolingFeature feature;
425
426         /**
427          * {@code True} if this host has processed a message, {@code false} otherwise.
428          */
429
430         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
431
432         /**
433          * This host's internal "DMaaP" topic.
434          */
435
436         private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
437
438         /**
439          * Queue for the external "DMaaP" topic.
440          */
441         @Getter
442         private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
443
444         /**
445          * Source that reads from the external topic and posts to the listener.
446          */
447
448         private TopicSource externalSource;
449
450         // mock objects
451         private final PolicyEngine engine = mock(PolicyEngine.class);
452         private final ListenerController controller = mock(ListenerController.class);
453         private final DroolsController drools = mock(DroolsController.class);
454
455         /**
456          * Constructor.
457          *
458          * @param context context
459          */
460
461         public Host(Context context) {
462             this.context = context;
463             when(controller.getName()).thenReturn(CONTROLLER1);
464             when(controller.getDrools()).thenReturn(drools);
465             // stop consuming events if the controller stops
466             when(controller.stop()).thenAnswer(args -> {
467                 externalSource.unregister(controller);
468                 return true;
469             });
470             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
471             context.addController(controller, drools);
472             // arrange to read from the external topic
473             externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
474             feature = new PoolingFeatureImpl(context);
475         }
476
477         /**
478          * Get name.
479          *
480          * @return the host name
481          */
482
483         public String getName() {
484             return feature.getHost();
485         }
486
487         /**
488          * Starts threads for the host so that it begins consuming from both the external
489          * "DMaaP" topic and its own internal "DMaaP" topic.
490          */
491
492         public void start() {
493             context.withHost(this, () -> {
494                 feature.beforeStart(engine);
495                 feature.afterCreate(controller);
496                 // assign the queue for this host's internal topic
497                 context.addInternal(getName(), msgQueue);
498                 feature.beforeStart(controller);
499                 // start consuming events from the external topic
500                 externalSource.register(controller);
501                 feature.afterStart(controller);
502             });
503         }
504
505         /**
506          * Stops the host's threads.
507          */
508
509         public void stop() {
510             feature.beforeStop(controller);
511             externalSource.unregister(controller);
512             feature.afterStop(controller);
513         }
514
515         /**
516          * Offers an event to the feature, before the policy controller handles it.
517          *
518          * @param protocol protocol
519          * @param topic2   topic
520          * @param event    event
521          * @return {@code true} if the event was handled, {@code false} otherwise
522          */
523
524         public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
525             return feature.beforeOffer(controller, protocol, topic2, event);
526         }
527
528         /**
529          * Offers an event to the feature, after the policy controller handles it.
530          *
531          * @param protocol protocol
532          * @param topic    topic
533          * @param event    event
534          * @param success  success
535          * @return {@code true} if the event was handled, {@code false} otherwise
536          */
537
538         public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
539             return feature.afterOffer(controller, protocol, topic, event, success);
540         }
541
542         /**
543          * Offers an event to the feature, before the drools controller handles it.
544          *
545          * @param fact fact
546          * @return {@code true} if the event was handled, {@code false} otherwise
547          */
548
549         public boolean beforeInsert(Object fact) {
550             return feature.beforeInsert(drools, fact);
551         }
552
553         /**
554          * Offers an event to the feature, after the drools controller handles it.
555          *
556          * @param fact          fact
557          * @param successInsert {@code true} if it was successfully inserted by the drools
558          *                      controller, {@code false} otherwise
559          * @return {@code true} if the event was handled, {@code false} otherwise
560          */
561
562         public boolean afterInsert(Object fact, boolean successInsert) {
563             return feature.afterInsert(drools, fact, successInsert);
564         }
565
566         /**
567          * Indicates that a message was seen for this host.
568          */
569
570         public void sawMessage() {
571             sawMsg.set(true);
572         }
573
574         /**
575          * Message seen.
576          *
577          * @return {@code true} if a message was seen for this host, {@code false} otherwise
578          */
579
580         public boolean messageSeen() {
581             return sawMsg.get();
582         }
583
584         /**
585          * Get internal queue.
586          *
587          * @return the queue associated with this host's internal topic
588          */
589
590         public BlockingQueue<String> getInternalQueue() {
591             return msgQueue;
592         }
593     }
594
595     /**
596      * Listener for the external topic. Simulates the actions taken by
597      * <i>AggregatedPolicyController.onTopicEvent</i>.
598      */
599
600     private static class MyExternalTopicListener implements Answer<Void> {
601         private final Context context;
602         private final Host host;
603
604         public MyExternalTopicListener(Context context, Host host) {
605             this.context = context;
606             this.host = host;
607         }
608
609         @Override
610         public Void answer(InvocationOnMock args) throws Throwable {
611             int index = 0;
612             CommInfrastructure commType = args.getArgument(index++);
613             String topic = args.getArgument(index++);
614             String event = args.getArgument(index++);
615             if (host.beforeOffer(commType, topic, event)) {
616                 return null;
617             }
618             boolean result;
619             Object fact = decodeEvent(event);
620             if (fact == null) {
621                 result = false;
622                 context.bumpDecodeErrors();
623             } else {
624                 result = true;
625                 if (!host.beforeInsert(fact)) {
626                     // feature did not handle it so we handle it here
627                     host.afterInsert(fact, result);
628                     host.sawMessage();
629                     context.addEvent();
630                 }
631             }
632             host.afterOffer(commType, topic, event, result);
633             return null;
634         }
635     }
636
637     /**
638      * Sink implementation that puts a message on the queue specified by the
639      * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
640      * message is placed on all queues.
641      */
642
643     private static class TopicSinkImpl extends TopicImpl implements TopicSink {
644         private final Context context;
645
646         /**
647          * Constructor.
648          *
649          * @param context context
650          */
651
652         public TopicSinkImpl(Context context) {
653             this.context = context;
654         }
655
656         @Override
657         public synchronized boolean send(String message) {
658             if (!isAlive()) {
659                 return false;
660             }
661             try {
662                 context.offerInternal(message);
663                 return true;
664             } catch (JsonParseException e) {
665                 logger.warn("could not decode message: {}", message);
666                 context.bumpDecodeErrors();
667                 return false;
668             }
669         }
670     }
671
672     /**
673      * Source implementation that reads from a queue associated with a topic.
674      */
675
676     private static class TopicSourceImpl extends TopicImpl implements TopicSource {
677
678         private final String topic;
679         /**
680          * Queue from which to retrieve messages.
681          */
682         private final BlockingQueue<String> queue;
683         /**
684          * Manages the current consumer thread. The "first" item is used as a trigger to
685          * tell the thread to stop processing, while the "second" item is triggered <i>by
686          * the thread</i> when it completes.
687          */
688         private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
689
690         /**
691          * Constructor.
692          *
693          * @param type  topic type
694          * @param queue topic from which to read
695          */
696
697         public TopicSourceImpl(String type, BlockingQueue<String> queue) {
698             this.topic = type;
699             this.queue = queue;
700         }
701
702         @Override
703         public String getTopic() {
704             return topic;
705         }
706
707         @Override
708         public boolean offer(String event) {
709             throw new UnsupportedOperationException("offer topic source");
710         }
711
712         /**
713          * Starts a thread that takes messages from the queue and gives them to the
714          * listener. Stops the thread of any previously registered listener.
715          */
716
717         @Override
718         public void register(TopicListener listener) {
719             Pair<CountDownLatch, CountDownLatch> newPair = Pair.of(new CountDownLatch(1), new CountDownLatch(1));
720             reregister(newPair);
721             Thread thread = new Thread(() -> {
722                 try {
723                     do {
724                         processMessages(newPair.getLeft(), listener);
725                     } while (!newPair.getLeft().await(stdInterPollMs, TimeUnit.MILLISECONDS));
726                     logger.info("topic source thread completed");
727                 } catch (InterruptedException e) {
728                     logger.warn("topic source thread aborted", e);
729                     Thread.currentThread().interrupt();
730                 } catch (RuntimeException e) {
731                     logger.warn("topic source thread aborted", e);
732                 }
733                 newPair.getRight().countDown();
734             });
735             thread.setDaemon(true);
736             thread.start();
737         }
738
739         /**
740          * Stops the thread of <i>any</i> currently registered listener.
741          */
742
743         @Override
744         public void unregister(TopicListener listener) {
745             reregister(null);
746         }
747
748         /**
749          * Registers a new "pair" with this source, stopping the consumer associated with
750          * any previous registration.
751          *
752          * @param newPair the new "pair", or {@code null} to unregister
753          */
754
755         private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
756             try {
757                 Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
758                 if (oldPair == null) {
759                     if (newPair == null) {
760                         // unregister was invoked twice in a row
761                         logger.warn("re-unregister for topic source");
762                     }
763                     // no previous thread to stop
764                     return;
765                 }
766                 // need to stop the previous thread
767                 // tell it to stop
768                 oldPair.getLeft().countDown();
769                 // wait for it to stop
770                 if (!oldPair.getRight().await(2, TimeUnit.SECONDS)) {
771                     logger.warn("old topic registration is still running");
772                 }
773             } catch (InterruptedException e) {
774                 logger.warn("old topic registration may still be running", e);
775                 Thread.currentThread().interrupt();
776             }
777             if (newPair != null) {
778                 // register was invoked twice in a row
779                 logger.warn("re-register for topic source");
780             }
781         }
782
783         /**
784          * Polls for messages from the topic and offers them to the listener.
785          *
786          * @param stopped  triggered if processing should stop
787          * @param listener listener
788          * @throws InterruptedException throws interrupted exception
789          */
790
791         private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
792             for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
793                 String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
794                 if (msg == null) {
795                     return;
796                 }
797                 listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
798             }
799         }
800     }
801
802     /**
803      * Topic implementation. Most methods just throw
804      * {@link UnsupportedOperationException}.
805      */
806
807     private static class TopicImpl implements Topic {
808
809         /**
810          * Constructor.
811          */
812
813         public TopicImpl() {
814             super();
815         }
816
817         @Override
818         public String getTopic() {
819             return INTERNAL_TOPIC;
820         }
821
822         @Override
823         public String getEffectiveTopic() {
824             return INTERNAL_TOPIC;
825         }
826
827         @Override
828         public CommInfrastructure getTopicCommInfrastructure() {
829             throw new UnsupportedOperationException("topic protocol");
830         }
831
832         @Override
833         public List<String> getServers() {
834             throw new UnsupportedOperationException("topic servers");
835         }
836
837         @Override
838         public String[] getRecentEvents() {
839             throw new UnsupportedOperationException("topic events");
840         }
841
842         @Override
843         public void register(TopicListener topicListener) {
844             throw new UnsupportedOperationException("register topic");
845         }
846
847         @Override
848         public void unregister(TopicListener topicListener) {
849             throw new UnsupportedOperationException("unregister topic");
850         }
851
852         @Override
853         public synchronized boolean start() {
854             return true;
855         }
856
857         @Override
858         public synchronized boolean stop() {
859             return true;
860         }
861
862         @Override
863         public synchronized void shutdown() {
864             // do nothing
865         }
866
867         @Override
868         public synchronized boolean isAlive() {
869             return true;
870         }
871
872         @Override
873         public boolean lock() {
874             throw new UnsupportedOperationException("lock topicink");
875         }
876
877         @Override
878         public boolean unlock() {
879             throw new UnsupportedOperationException("unlock topic");
880         }
881
882         @Override
883         public boolean isLocked() {
884             throw new UnsupportedOperationException("topic isLocked");
885         }
886     }
887
888     /**
889      * Feature with overrides.
890      */
891
892     private static class PoolingFeatureImpl extends PoolingFeature {
893         private final Context context;
894
895         /**
896          * Constructor.
897          *
898          * @param context context
899          */
900
901         public PoolingFeatureImpl(Context context) {
902             this.context = context;
903             /*
904              * Note: do NOT extract anything from "context" at this point, because it
905              * hasn't been fully initialized yet
906              */
907         }
908
909         @Override
910         public Properties getProperties(String featName) {
911             Properties props = new Properties();
912             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
913             props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
914             props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
915             props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
916             props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
917             props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1), "" + stdOfflinePubWaitMs);
918             props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1), "" + stdStartHeartbeatMs);
919             props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
920             props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
921             props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
922                 "" + stdActiveHeartbeatMs);
923             props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1), "" + stdInterHeartbeatMs);
924             return props;
925         }
926
927         @Override
928         public PolicyController getController(DroolsController droolsController) {
929             return context.getController(droolsController);
930         }
931
932         /**
933          * Embeds a specializer within a property name, after the prefix.
934          *
935          * @param propnm property name into which it should be embedded
936          * @param spec   specializer to be embedded
937          * @return the property name, with the specializer embedded within it
938          */
939
940         private String specialize(String propnm, String spec) {
941             String suffix = propnm.substring(PREFIX.length());
942             return PREFIX + spec + "." + suffix;
943         }
944
945         @Override
946         protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
947                                                  CountDownLatch activeLatch) {
948             currentContext.set(context);
949             return new PoolingManagerTest(host, controller, props, activeLatch);
950         }
951     }
952
953     /**
954      * Pooling Manager with overrides.
955      */
956
957     private static class PoolingManagerTest extends PoolingManagerImpl {
958
959         /**
960          * Constructor.
961          *
962          * @param host        the host
963          * @param controller  the controller
964          * @param props       the properties
965          * @param activeLatch the latch
966          */
967
968         public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
969                                   CountDownLatch activeLatch) {
970             super(host, controller, props, activeLatch);
971         }
972
973         @Override
974         protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
975             return new TopicMessageManagerImpl(topic);
976         }
977
978         @Override
979         protected boolean canDecodeEvent(DroolsController drools, String topic) {
980             return true;
981         }
982
983         @Override
984         protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
985             return decodeEvent(event);
986         }
987     }
988
989     /**
990      * DMaaP Manager with overrides.
991      */
992
993     private static class TopicMessageManagerImpl extends TopicMessageManager {
994
995         /**
996          * Constructor.
997          *
998          * @param topic the topic
999          * @throws PoolingFeatureException if an error occurs
1000          */
1001
1002         public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
1003             super(topic);
1004         }
1005
1006         @Override
1007         protected List<TopicSource> getTopicSources() {
1008             return List.of(
1009                 new TopicSourceImpl(INTERNAL_TOPIC, currentContext.get().getCurrentHost().getInternalQueue()));
1010         }
1011
1012         @Override
1013         protected List<TopicSink> getTopicSinks() {
1014             return List.of(new TopicSinkImpl(currentContext.get()));
1015         }
1016     }
1017
1018     /**
1019      * Controller that also implements the {@link TopicListener} interface.
1020      */
1021
1022     private static interface ListenerController extends PolicyController, TopicListener {
1023     }
1024
1025     /**
1026      * Simple function that takes no arguments and returns nothing.
1027      */
1028
1029     @FunctionalInterface
1030     private static interface VoidFunction {
1031         void apply();
1032     }
1033 }