Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / test / java / org / onap / policy / drools / pooling / EndToEndFeatureTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling;
23
24 import static org.junit.jupiter.api.Assertions.assertEquals;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.when;
30 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
31
32 import com.google.gson.Gson;
33 import com.google.gson.JsonParseException;
34 import java.util.Arrays;
35 import java.util.Deque;
36 import java.util.IdentityHashMap;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Properties;
40 import java.util.TreeMap;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45 import org.junit.jupiter.api.AfterAll;
46 import org.junit.jupiter.api.AfterEach;
47 import org.junit.jupiter.api.BeforeAll;
48 import org.junit.jupiter.api.BeforeEach;
49 import org.junit.jupiter.api.Disabled;
50 import org.junit.jupiter.api.Test;
51 import org.mockito.invocation.InvocationOnMock;
52 import org.mockito.stubbing.Answer;
53 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
54 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
55 import org.onap.policy.common.endpoints.event.comm.TopicListener;
56 import org.onap.policy.common.endpoints.event.comm.TopicSink;
57 import org.onap.policy.common.endpoints.event.comm.TopicSource;
58 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
59 import org.onap.policy.drools.controller.DroolsController;
60 import org.onap.policy.drools.system.PolicyController;
61 import org.onap.policy.drools.system.PolicyEngine;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own
67  * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the
68  * following are not: <dl> <dt>PolicyEngine, PolicyController, DroolsController</dt> <dd>mocked</dd>
69  * </dl>
70  *
71  * <p>The following fields must be set before executing this: <ul> <li>UEB_SERVERS</li>
72  * <li>INTERNAL_TOPIC</li> <li>EXTERNAL_TOPIC</li> </ul>
73  */
74 public class EndToEndFeatureTest {
75
76     private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class);
77
78     /**
79      * UEB servers for both internal & external topics.
80      */
81     private static final String UEB_SERVERS = "ueb-server";
82
83     /**
84      * Name of the topic used for inter-host communication.
85      */
86     private static final String INTERNAL_TOPIC = "internal-topic";
87
88     /**
89      * Name of the topic from which "external" events "arrive".
90      */
91     private static final String EXTERNAL_TOPIC = "external-topic";
92
93     /**
94      * Consumer group to use when polling the external topic.
95      */
96     private static final String EXTERNAL_GROUP = EndToEndFeatureTest.class.getName();
97
98     /**
99      * Name of the controller.
100      */
101     private static final String CONTROLLER1 = "controller.one";
102
103     /**
104      * Maximum number of items to fetch from DMaaP in a single poll.
105      */
106     private static final String FETCH_LIMIT = "5";
107
108     private static final long STD_REACTIVATE_WAIT_MS = 10000;
109     private static final long STD_IDENTIFICATION_MS = 10000;
110     private static final long STD_START_HEARTBEAT_MS = 15000;
111     private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
112     private static final long STD_INTER_HEARTBEAT_MS = 5000;
113     private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
114     private static final long EVENT_WAIT_SEC = 15;
115
116     /**
117      * Used to decode events from the external topic.
118      */
119     private static final Gson mapper = new Gson();
120
121     /**
122      * Used to identify the current host.
123      */
124     private static final ThreadLocal<Host> currentHost = new ThreadLocal<Host>();
125
126     /**
127      * Sink for external DMaaP topic.
128      */
129     private static TopicSink externalSink;
130
131     /**
132      * Sink for internal DMaaP topic.
133      */
134     private static TopicSink internalSink;
135
136     /**
137      * Context for the current test case.
138      */
139     private Context ctx;
140
141     /**
142      * Setup before class.
143      *
144      */
145     @BeforeAll
146     public static void setUpBeforeClass() {
147         externalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
148         externalSink.start();
149
150         internalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
151         internalSink.start();
152     }
153
154     /**
155      * Tear down after class.
156      *
157      */
158     @AfterAll
159     public static void tearDownAfterClass() {
160         externalSink.stop();
161         internalSink.stop();
162     }
163
164     /**
165      * Setup.
166      */
167     @BeforeEach
168     public void setUp() {
169         ctx = null;
170     }
171
172     /**
173      * Tear down.
174      */
175     @AfterEach
176     public void tearDown() {
177         if (ctx != null) {
178             ctx.destroy();
179         }
180     }
181
182     /*
183      * This test should only be run manually, after configuring all the fields,
184      * thus it is ignored.
185      */
186     @Disabled
187     @Test
188     public void test_SingleHost() throws Exception {    // NOSONAR
189         run(70, 1);
190     }
191
192     /*
193      * This test should only be run manually, after configuring all the fields,
194      * thus it is ignored.
195      */
196     @Disabled
197     @Test
198     public void test_TwoHosts() throws Exception {      // NOSONAR
199         run(200, 2);
200     }
201
202     /*
203      * This test should only be run manually, after configuring all the fields,
204      * thus it is ignored.
205      */
206     @Disabled
207     @Test
208     public void test_ThreeHosts() throws Exception {    // NOSONAR
209         run(200, 3);
210     }
211
212     private void run(int nmessages, int nhosts) throws Exception {
213         ctx = new Context(nmessages);
214
215         for (int x = 0; x < nhosts; ++x) {
216             ctx.addHost();
217         }
218
219         ctx.startHosts();
220         ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
221
222         for (int x = 0; x < nmessages; ++x) {
223             ctx.offerExternal(makeMessage(x));
224         }
225
226         ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
227
228         assertEquals(0, ctx.getDecodeErrors());
229         assertEquals(0, ctx.getRemainingEvents());
230         ctx.checkAllSawAMsg();
231     }
232
233     private String makeMessage(int reqnum) {
234         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
235     }
236
237     private static Properties makeSinkProperties(String topic) {
238         Properties props = new Properties();
239
240         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic);
241
242         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
243                 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
244         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
245                 + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
246         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
247                 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
248
249         return props;
250     }
251
252     private static Properties makeSourceProperties(String topic) {
253         Properties props = new Properties();
254
255         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
256
257         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
258                 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
259         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
260                 + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
261         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
262                 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
263
264         if (EXTERNAL_TOPIC.equals(topic)) {
265             // consumer group is a constant
266             props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
267                     + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
268
269             // consumer instance is generated by the BusConsumer code
270         }
271
272         // else internal topic: feature populates info for internal topic
273
274         return props;
275     }
276
277     /**
278      * Decodes an event.
279      *
280      * @param event event
281      * @return the decoded event, or {@code null} if it cannot be decoded
282      */
283     private static Object decodeEvent(String event) {
284         try {
285             return mapper.fromJson(event, TreeMap.class);
286
287         } catch (JsonParseException e) {
288             logger.warn("cannot decode external event", e);
289             return null;
290         }
291     }
292
293     /**
294      * Context used for a single test case.
295      */
296     private static class Context {
297
298         /**
299          * Hosts that have been added to this context.
300          */
301         private final Deque<Host> hosts = new LinkedList<>();
302
303         /**
304          * Maps a drools controller to its policy controller.
305          */
306         private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
307
308         /**
309          * Counts the number of decode errors.
310          */
311         private final AtomicInteger decodeErrors = new AtomicInteger(0);
312
313         /**
314          * Number of events we're still waiting to receive.
315          */
316         private final CountDownLatch eventCounter;
317
318         /**
319          * Constructor.
320          *
321          * @param events number of events to be processed
322          */
323         public Context(int events) {
324             eventCounter = new CountDownLatch(events);
325         }
326
327         /**
328          * Destroys the context, stopping any hosts that remain.
329          */
330         public void destroy() {
331             stopHosts();
332             hosts.clear();
333         }
334
335         /**
336          * Creates and adds a new host to the context.
337          *
338          * @return the new Host
339          */
340         public Host addHost() {
341             Host host = new Host(this);
342             hosts.add(host);
343
344             return host;
345         }
346
347         /**
348          * Starts the hosts.
349          */
350         public void startHosts() {
351             hosts.forEach(host -> host.start());
352         }
353
354         /**
355          * Stops the hosts.
356          */
357         public void stopHosts() {
358             hosts.forEach(host -> host.stop());
359         }
360
361         /**
362          * Verifies that all hosts processed at least one message.
363          */
364         public void checkAllSawAMsg() {
365             int msgs = 0;
366             for (Host host : hosts) {
367                 assertTrue(host.messageSeen(), "msgs=" + msgs);
368                 ++msgs;
369             }
370         }
371
372         /**
373          * Offers an event to the external topic.
374          *
375          * @param event event
376          */
377         public void offerExternal(String event) {
378             externalSink.send(event);
379         }
380
381         /**
382          * Associates a controller with its drools controller.
383          *
384          * @param controller controller
385          * @param droolsController drools controller
386          */
387         public void addController(PolicyController controller, DroolsController droolsController) {
388             drools2policy.put(droolsController, controller);
389         }
390
391         /**
392          * Get controller.
393          *
394          * @param droolsController drools controller
395          * @return the controller associated with a drools controller, or {@code null} if it has no
396          *         associated controller
397          */
398         public PolicyController getController(DroolsController droolsController) {
399             return drools2policy.get(droolsController);
400         }
401
402         /**
403          * Get decode errors.
404          *
405          * @return the number of decode errors so far
406          */
407         public int getDecodeErrors() {
408             return decodeErrors.get();
409         }
410
411         /**
412          * Increments the count of decode errors.
413          */
414         public void bumpDecodeErrors() {
415             decodeErrors.incrementAndGet();
416         }
417
418         /**
419          * Get remaining events.
420          *
421          * @return the number of events that haven't been processed
422          */
423         public long getRemainingEvents() {
424             return eventCounter.getCount();
425         }
426
427         /**
428          * Adds an event to the counter.
429          */
430         public void addEvent() {
431             eventCounter.countDown();
432         }
433
434         /**
435          * Waits, for a period of time, for all events to be processed.
436          *
437          * @param time time
438          * @param units units
439          * @return {@code true} if all events have been processed, {@code false} otherwise
440          * @throws InterruptedException throws interrupted exception
441          */
442         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
443             return eventCounter.await(time, units);
444         }
445
446         /**
447          * Waits, for a period of time, for all hosts to enter the Active state.
448          *
449          * @param timeMs maximum time to wait, in milliseconds
450          * @throws InterruptedException throws interrupted exception
451          */
452         public void awaitAllActive(long timeMs) throws InterruptedException {
453             long tend = timeMs + System.currentTimeMillis();
454
455             for (Host host : hosts) {
456                 long tremain = Math.max(0, tend - System.currentTimeMillis());
457                 assertTrue(host.awaitActive(tremain));
458             }
459         }
460     }
461
462     /**
463      * Simulates a single "host".
464      */
465     private static class Host {
466
467         private final PoolingFeature feature;
468
469         /**
470          * {@code True} if this host has processed a message, {@code false} otherwise.
471          */
472         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
473
474         private final TopicSource externalSource;
475         private final TopicSource internalSource;
476
477         // mock objects
478         private final PolicyEngine engine = mock(PolicyEngine.class);
479         private final ListenerController controller = mock(ListenerController.class);
480         private final DroolsController drools = mock(DroolsController.class);
481
482         /**
483          * Constructor.
484          *
485          * @param context context
486          */
487         public Host(Context context) {
488
489             when(controller.getName()).thenReturn(CONTROLLER1);
490             when(controller.getDrools()).thenReturn(drools);
491
492             externalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(EXTERNAL_TOPIC))
493                             .get(0);
494             internalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(INTERNAL_TOPIC))
495                             .get(0);
496
497             // stop consuming events if the controller stops
498             when(controller.stop()).thenAnswer(args -> {
499                 externalSource.unregister(controller);
500                 return true;
501             });
502
503             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
504
505             context.addController(controller, drools);
506
507             feature = new PoolingFeatureImpl(context, this);
508         }
509
510         /**
511          * Waits, for a period of time, for the host to enter the Active state.
512          *
513          * @param timeMs time to wait, in milliseconds
514          * @return {@code true} if the host entered the Active state within the given amount of
515          *         time, {@code false} otherwise
516          * @throws InterruptedException throws interrupted exception
517          */
518         public boolean awaitActive(long timeMs) throws InterruptedException {
519             return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
520         }
521
522         /**
523          * Starts threads for the host so that it begins consuming from both the external "DMaaP"
524          * topic and its own internal "DMaaP" topic.
525          */
526         public void start() {
527             feature.beforeStart(engine);
528             feature.afterCreate(controller);
529
530             feature.beforeStart(controller);
531
532             // start consuming events from the external topic
533             externalSource.register(controller);
534
535             feature.afterStart(controller);
536         }
537
538         /**
539          * Stops the host's threads.
540          */
541         public void stop() {
542             feature.beforeStop(controller);
543             externalSource.unregister(controller);
544             feature.afterStop(controller);
545         }
546
547         /**
548          * Offers an event to the feature, before the policy controller handles it.
549          *
550          * @param protocol protocol
551          * @param topic2 topic
552          * @param event event
553          * @return {@code true} if the event was handled, {@code false} otherwise
554          */
555         public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
556             return feature.beforeOffer(controller, protocol, topic2, event);
557         }
558
559         /**
560          * Offers an event to the feature, after the policy controller handles it.
561          *
562          * @param protocol protocol
563          * @param topic topic
564          * @param event event
565          * @param success success
566          * @return {@code true} if the event was handled, {@code false} otherwise
567          */
568         public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
569
570             return feature.afterOffer(controller, protocol, topic, event, success);
571         }
572
573         /**
574          * Offers an event to the feature, before the drools controller handles it.
575          *
576          * @param fact fact
577          * @return {@code true} if the event was handled, {@code false} otherwise
578          */
579         public boolean beforeInsert(Object fact) {
580             return feature.beforeInsert(drools, fact);
581         }
582
583         /**
584          * Offers an event to the feature, after the drools controller handles it.
585          *
586          * @param fact fact
587          * @param successInsert {@code true} if it was successfully inserted by the drools
588          *        controller, {@code false} otherwise
589          * @return {@code true} if the event was handled, {@code false} otherwise
590          */
591         public boolean afterInsert(Object fact, boolean successInsert) {
592             return feature.afterInsert(drools, fact, successInsert);
593         }
594
595         /**
596          * Indicates that a message was seen for this host.
597          */
598         public void sawMessage() {
599             sawMsg.set(true);
600         }
601
602         /**
603          * Message seen.
604          *
605          * @return {@code true} if a message was seen for this host, {@code false} otherwise
606          */
607         public boolean messageSeen() {
608             return sawMsg.get();
609         }
610     }
611
612     /**
613      * Listener for the external topic. Simulates the actions taken by
614      * <i>AggregatedPolicyController.onTopicEvent</i>.
615      */
616     private static class MyExternalTopicListener implements Answer<Void> {
617
618         private final Context context;
619         private final Host host;
620
621         public MyExternalTopicListener(Context context, Host host) {
622             this.context = context;
623             this.host = host;
624         }
625
626         @Override
627         public Void answer(InvocationOnMock args) throws Throwable {
628             int index = 0;
629             CommInfrastructure commType = args.getArgument(index++);
630             String topic = args.getArgument(index++);
631             String event = args.getArgument(index++);
632
633             if (host.beforeOffer(commType, topic, event)) {
634                 return null;
635             }
636
637             boolean result;
638             Object fact = decodeEvent(event);
639
640             if (fact == null) {
641                 result = false;
642                 context.bumpDecodeErrors();
643
644             } else {
645                 result = true;
646
647                 if (!host.beforeInsert(fact)) {
648                     // feature did not handle it so we handle it here
649                     host.afterInsert(fact, result);
650
651                     host.sawMessage();
652                     context.addEvent();
653                 }
654             }
655
656             host.afterOffer(commType, topic, event, result);
657             return null;
658         }
659     }
660
661     /**
662      * Feature with overrides.
663      */
664     private static class PoolingFeatureImpl extends PoolingFeature {
665
666         private final Context context;
667         private final Host host;
668
669         /**
670          * Constructor.
671          *
672          * @param context context
673          */
674         public PoolingFeatureImpl(Context context, Host host) {
675             this.context = context;
676             this.host = host;
677
678             /*
679              * Note: do NOT extract anything from "context" at this point, because it hasn't been
680              * fully initialized yet
681              */
682         }
683
684         @Override
685         public Properties getProperties(String featName) {
686             Properties props = new Properties();
687
688             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
689
690             props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
691             props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
692             props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
693             props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
694             props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
695                     "" + STD_OFFLINE_PUB_WAIT_MS);
696             props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
697                     "" + STD_START_HEARTBEAT_MS);
698             props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
699             props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
700             props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
701                     "" + STD_ACTIVE_HEARTBEAT_MS);
702             props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
703                     "" + STD_INTER_HEARTBEAT_MS);
704
705             props.putAll(makeSinkProperties(INTERNAL_TOPIC));
706             props.putAll(makeSourceProperties(INTERNAL_TOPIC));
707
708             return props;
709         }
710
711         @Override
712         public PolicyController getController(DroolsController droolsController) {
713             return context.getController(droolsController);
714         }
715
716         /**
717          * Embeds a specializer within a property name, after the prefix.
718          *
719          * @param propnm property name into which it should be embedded
720          * @param spec specializer to be embedded
721          * @return the property name, with the specializer embedded within it
722          */
723         private String specialize(String propnm, String spec) {
724             String suffix = propnm.substring(PREFIX.length());
725             return PREFIX + spec + "." + suffix;
726         }
727
728         @Override
729         protected PoolingManagerImpl makeManager(String hostName, PolicyController controller, PoolingProperties props,
730                         CountDownLatch activeLatch) {
731
732             /*
733              * Set this before creating the test, because the test's superclass
734              * constructor uses it before the test object has a chance to store it.
735              */
736             currentHost.set(host);
737
738             return new PoolingManagerTest(hostName, controller, props, activeLatch);
739         }
740     }
741
742     /**
743      * Pooling Manager with overrides.
744      */
745     private static class PoolingManagerTest extends PoolingManagerImpl {
746
747         /**
748          * Constructor.
749          *
750          * @param hostName the host
751          * @param controller the controller
752          * @param props the properties
753          * @param activeLatch the latch
754          */
755         public PoolingManagerTest(String hostName, PolicyController controller,
756                         PoolingProperties props, CountDownLatch activeLatch) {
757
758             super(hostName, controller, props, activeLatch);
759         }
760
761         @Override
762         protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
763             return new TopicMessageManagerImpl(topic);
764         }
765
766         @Override
767         protected boolean canDecodeEvent(DroolsController drools, String topic) {
768             return true;
769         }
770
771         @Override
772         protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
773             return decodeEvent(event);
774         }
775     }
776
777     /**
778      * DMaaP Manager with overrides.
779      */
780     private static class TopicMessageManagerImpl extends TopicMessageManager {
781
782         /**
783          * Constructor.
784          *
785          * @param topic the topic
786          * @throws PoolingFeatureException if an error occurs
787          */
788         public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
789             super(topic);
790         }
791
792         @Override
793         protected List<TopicSource> getTopicSources() {
794             Host host = currentHost.get();
795             return Arrays.asList(host.internalSource, host.externalSource);
796         }
797
798         @Override
799         protected List<TopicSink> getTopicSinks() {
800             return Arrays.asList(internalSink, externalSink);
801         }
802     }
803
804     /**
805      * Controller that also implements the {@link TopicListener} interface.
806      */
807     private static interface ListenerController extends PolicyController, TopicListener {
808
809     }
810 }