362c3b01cf140ffd74da6df27d04e3af96e542a9
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / EndToEndFeatureTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
30
31 import com.fasterxml.jackson.core.type.TypeReference;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Deque;
36 import java.util.IdentityHashMap;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Properties;
40 import java.util.TreeMap;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45 import org.junit.After;
46 import org.junit.AfterClass;
47 import org.junit.Before;
48 import org.junit.BeforeClass;
49 import org.junit.Ignore;
50 import org.junit.Test;
51 import org.mockito.invocation.InvocationOnMock;
52 import org.mockito.stubbing.Answer;
53 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
54 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
55 import org.onap.policy.common.endpoints.event.comm.TopicListener;
56 import org.onap.policy.common.endpoints.event.comm.TopicSink;
57 import org.onap.policy.common.endpoints.event.comm.TopicSource;
58 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
59 import org.onap.policy.drools.controller.DroolsController;
60 import org.onap.policy.drools.system.PolicyController;
61 import org.onap.policy.drools.system.PolicyEngine;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own
67  * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the
68  * following are not: <dl> <dt>PolicyEngine, PolicyController, DroolsController</dt> <dd>mocked</dd>
69  * </dl>
70  *
71  * <p>The following fields must be set before executing this: <ul> <li>UEB_SERVERS</li>
72  * <li>INTERNAL_TOPIC</li> <li>EXTERNAL_TOPIC</li> </ul>
73  */
74 public class EndToEndFeatureTest {
75
76     private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class);
77
78     /**
79      * UEB servers for both internal & external topics.
80      */
81     private static final String UEB_SERVERS = "";
82
83     /**
84      * Name of the topic used for inter-host communication.
85      */
86     private static final String INTERNAL_TOPIC = "";
87
88     /**
89      * Name of the topic from which "external" events "arrive".
90      */
91     private static final String EXTERNAL_TOPIC = "";
92
93     /**
94      * Consumer group to use when polling the external topic.
95      */
96     private static final String EXTERNAL_GROUP = EndToEndFeatureTest.class.getName();
97
98     /**
99      * Name of the controller.
100      */
101     private static final String CONTROLLER1 = "controller.one";
102
103     /**
104      * Maximum number of items to fetch from DMaaP in a single poll.
105      */
106     private static final String FETCH_LIMIT = "5";
107
108     private static final long STD_REACTIVATE_WAIT_MS = 10000;
109     private static final long STD_IDENTIFICATION_MS = 10000;
110     private static final long STD_START_HEARTBEAT_MS = 15000;
111     private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
112     private static final long STD_INTER_HEARTBEAT_MS = 5000;
113     private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
114     private static final long EVENT_WAIT_SEC = 15;
115
116     /**
117      * Used to decode events into a Map.
118      */
119     private static final TypeReference<TreeMap<String, String>> typeRef =
120                     new TypeReference<TreeMap<String, String>>() {};
121
122     /**
123      * Used to decode events from the external topic.
124      */
125     private static final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
126         @Override
127         protected ObjectMapper initialValue() {
128             return new ObjectMapper();
129         }
130     };
131
132     /**
133      * Used to identify the current host.
134      */
135     private static final ThreadLocal<Host> currentHost = new ThreadLocal<Host>();
136
137     /**
138      * Sink for external DMaaP topic.
139      */
140     private static TopicSink externalSink;
141
142     /**
143      * Sink for internal DMaaP topic.
144      */
145     private static TopicSink internalSink;
146
147     /**
148      * Context for the current test case.
149      */
150     private Context ctx;
151
152     /**
153      * Setup before class.
154      *
155      */
156     @BeforeClass
157     public static void setUpBeforeClass() {
158         externalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
159         externalSink.start();
160
161         internalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
162         internalSink.start();
163     }
164
165     /**
166      * Tear down after class.
167      *
168      */
169     @AfterClass
170     public static void tearDownAfterClass() {
171         externalSink.stop();
172         internalSink.stop();
173     }
174
175     /**
176      * Setup.
177      */
178     @Before
179     public void setUp() {
180         ctx = null;
181     }
182
183     /**
184      * Tear down.
185      */
186     @After
187     public void tearDown() {
188         if (ctx != null) {
189             ctx.destroy();
190         }
191     }
192
193     @Ignore
194     @Test
195     public void test_SingleHost() throws Exception {
196         run(70, 1);
197     }
198
199     @Ignore
200     @Test
201     public void test_TwoHosts() throws Exception {
202         run(200, 2);
203     }
204
205     @Ignore
206     @Test
207     public void test_ThreeHosts() throws Exception {
208         run(200, 3);
209     }
210
211     private void run(int nmessages, int nhosts) throws Exception {
212         ctx = new Context(nmessages);
213
214         for (int x = 0; x < nhosts; ++x) {
215             ctx.addHost();
216         }
217
218         ctx.startHosts();
219         ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
220
221         for (int x = 0; x < nmessages; ++x) {
222             ctx.offerExternal(makeMessage(x));
223         }
224
225         ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
226
227         assertEquals(0, ctx.getDecodeErrors());
228         assertEquals(0, ctx.getRemainingEvents());
229         ctx.checkAllSawAMsg();
230     }
231
232     private String makeMessage(int reqnum) {
233         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
234     }
235
236     private static Properties makeSinkProperties(String topic) {
237         Properties props = new Properties();
238
239         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic);
240
241         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
242                 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
243         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
244                 + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
245         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
246                 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
247
248         return props;
249     }
250
251     private static Properties makeSourceProperties(String topic) {
252         Properties props = new Properties();
253
254         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
255
256         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
257                 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
258         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
259                 + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
260         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
261                 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
262
263         if (EXTERNAL_TOPIC.equals(topic)) {
264             // consumer group is a constant
265             props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
266                     + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
267
268             // consumer instance is generated by the BusConsumer code
269         }
270
271         // else internal topic: feature populates info for internal topic
272
273         return props;
274     }
275
276     /**
277      * Decodes an event.
278      *
279      * @param event event
280      * @return the decoded event, or {@code null} if it cannot be decoded
281      */
282     private static Object decodeEvent(String event) {
283         try {
284             return mapper.get().readValue(event, typeRef);
285
286         } catch (IOException e) {
287             logger.warn("cannot decode external event", e);
288             return null;
289         }
290     }
291
292     /**
293      * Context used for a single test case.
294      */
295     private static class Context {
296
297         /**
298          * Hosts that have been added to this context.
299          */
300         private final Deque<Host> hosts = new LinkedList<>();
301
302         /**
303          * Maps a drools controller to its policy controller.
304          */
305         private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
306
307         /**
308          * Counts the number of decode errors.
309          */
310         private final AtomicInteger decodeErrors = new AtomicInteger(0);
311
312         /**
313          * Number of events we're still waiting to receive.
314          */
315         private final CountDownLatch eventCounter;
316
317         /**
318          * Constructor.
319          *
320          * @param nEvents number of events to be processed
321          */
322         public Context(int events) {
323             eventCounter = new CountDownLatch(events);
324         }
325
326         /**
327          * Destroys the context, stopping any hosts that remain.
328          */
329         public void destroy() {
330             stopHosts();
331             hosts.clear();
332         }
333
334         /**
335          * Creates and adds a new host to the context.
336          *
337          * @return the new Host
338          */
339         public Host addHost() {
340             Host host = new Host(this);
341             hosts.add(host);
342
343             return host;
344         }
345
346         /**
347          * Starts the hosts.
348          */
349         public void startHosts() {
350             hosts.forEach(host -> host.start());
351         }
352
353         /**
354          * Stops the hosts.
355          */
356         public void stopHosts() {
357             hosts.forEach(host -> host.stop());
358         }
359
360         /**
361          * Verifies that all hosts processed at least one message.
362          */
363         public void checkAllSawAMsg() {
364             int msgs = 0;
365             for (Host host : hosts) {
366                 assertTrue("msgs=" + msgs, host.messageSeen());
367                 ++msgs;
368             }
369         }
370
371         /**
372          * Offers an event to the external topic.
373          *
374          * @param event event
375          */
376         public void offerExternal(String event) {
377             externalSink.send(event);
378         }
379
380         /**
381          * Associates a controller with its drools controller.
382          *
383          * @param controller controller
384          * @param droolsController drools controller
385          */
386         public void addController(PolicyController controller, DroolsController droolsController) {
387             drools2policy.put(droolsController, controller);
388         }
389
390         /**
391          * Get controller.
392          *
393          * @param droolsController drools controller
394          * @return the controller associated with a drools controller, or {@code null} if it has no
395          *         associated controller
396          */
397         public PolicyController getController(DroolsController droolsController) {
398             return drools2policy.get(droolsController);
399         }
400
401         /**
402          * Get decode errors.
403          *
404          * @return the number of decode errors so far
405          */
406         public int getDecodeErrors() {
407             return decodeErrors.get();
408         }
409
410         /**
411          * Increments the count of decode errors.
412          */
413         public void bumpDecodeErrors() {
414             decodeErrors.incrementAndGet();
415         }
416
417         /**
418          * Get remaining events.
419          *
420          * @return the number of events that haven't been processed
421          */
422         public long getRemainingEvents() {
423             return eventCounter.getCount();
424         }
425
426         /**
427          * Adds an event to the counter.
428          */
429         public void addEvent() {
430             eventCounter.countDown();
431         }
432
433         /**
434          * Waits, for a period of time, for all events to be processed.
435          *
436          * @param time time
437          * @param units units
438          * @return {@code true} if all events have been processed, {@code false} otherwise
439          * @throws InterruptedException throws interrupted exception
440          */
441         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
442             return eventCounter.await(time, units);
443         }
444
445         /**
446          * Waits, for a period of time, for all hosts to enter the Active state.
447          *
448          * @param timeMs maximum time to wait, in milliseconds
449          * @throws InterruptedException throws interrupted exception
450          */
451         public void awaitAllActive(long timeMs) throws InterruptedException {
452             long tend = timeMs + System.currentTimeMillis();
453
454             for (Host host : hosts) {
455                 long tremain = Math.max(0, tend - System.currentTimeMillis());
456                 assertTrue(host.awaitActive(tremain));
457             }
458         }
459     }
460
461     /**
462      * Simulates a single "host".
463      */
464     private static class Host {
465
466         private final PoolingFeature feature;
467
468         /**
469          * {@code True} if this host has processed a message, {@code false} otherwise.
470          */
471         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
472
473         private final TopicSource externalSource;
474         private final TopicSource internalSource;
475
476         // mock objects
477         private final PolicyEngine engine = mock(PolicyEngine.class);
478         private final ListenerController controller = mock(ListenerController.class);
479         private final DroolsController drools = mock(DroolsController.class);
480
481         /**
482          * Constructor.
483          *
484          * @param context context
485          */
486         public Host(Context context) {
487
488             when(controller.getName()).thenReturn(CONTROLLER1);
489             when(controller.getDrools()).thenReturn(drools);
490
491             externalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
492             internalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
493
494             // stop consuming events if the controller stops
495             when(controller.stop()).thenAnswer(args -> {
496                 externalSource.unregister(controller);
497                 return true;
498             });
499
500             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
501
502             context.addController(controller, drools);
503
504             feature = new PoolingFeatureImpl(context, this);
505         }
506
507         /**
508          * Waits, for a period of time, for the host to enter the Active state.
509          *
510          * @param timeMs time to wait, in milliseconds
511          * @return {@code true} if the host entered the Active state within the given amount of
512          *         time, {@code false} otherwise
513          * @throws InterruptedException throws interrupted exception
514          */
515         public boolean awaitActive(long timeMs) throws InterruptedException {
516             return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
517         }
518
519         /**
520          * Starts threads for the host so that it begins consuming from both the external "DMaaP"
521          * topic and its own internal "DMaaP" topic.
522          */
523         public void start() {
524             feature.beforeStart(engine);
525             feature.afterCreate(controller);
526
527             feature.beforeStart(controller);
528
529             // start consuming events from the external topic
530             externalSource.register(controller);
531
532             feature.afterStart(controller);
533         }
534
535         /**
536          * Stops the host's threads.
537          */
538         public void stop() {
539             feature.beforeStop(controller);
540             externalSource.unregister(controller);
541             feature.afterStop(controller);
542         }
543
544         /**
545          * Offers an event to the feature, before the policy controller handles it.
546          *
547          * @param protocol protocol
548          * @param topic2 topic
549          * @param event event
550          * @return {@code true} if the event was handled, {@code false} otherwise
551          */
552         public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
553             return feature.beforeOffer(controller, protocol, topic2, event);
554         }
555
556         /**
557          * Offers an event to the feature, after the policy controller handles it.
558          *
559          * @param protocol protocol
560          * @param topic topic
561          * @param event event
562          * @param success success
563          * @return {@code true} if the event was handled, {@code false} otherwise
564          */
565         public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
566
567             return feature.afterOffer(controller, protocol, topic, event, success);
568         }
569
570         /**
571          * Offers an event to the feature, before the drools controller handles it.
572          *
573          * @param fact fact
574          * @return {@code true} if the event was handled, {@code false} otherwise
575          */
576         public boolean beforeInsert(Object fact) {
577             return feature.beforeInsert(drools, fact);
578         }
579
580         /**
581          * Offers an event to the feature, after the drools controller handles it.
582          *
583          * @param fact fact
584          * @param successInsert {@code true} if it was successfully inserted by the drools
585          *        controller, {@code false} otherwise
586          * @return {@code true} if the event was handled, {@code false} otherwise
587          */
588         public boolean afterInsert(Object fact, boolean successInsert) {
589             return feature.afterInsert(drools, fact, successInsert);
590         }
591
592         /**
593          * Indicates that a message was seen for this host.
594          */
595         public void sawMessage() {
596             sawMsg.set(true);
597         }
598
599         /**
600          * Message seen.
601          *
602          * @return {@code true} if a message was seen for this host, {@code false} otherwise
603          */
604         public boolean messageSeen() {
605             return sawMsg.get();
606         }
607     }
608
609     /**
610      * Listener for the external topic. Simulates the actions taken by
611      * <i>AggregatedPolicyController.onTopicEvent</i>.
612      */
613     private static class MyExternalTopicListener implements Answer<Void> {
614
615         private final Context context;
616         private final Host host;
617
618         public MyExternalTopicListener(Context context, Host host) {
619             this.context = context;
620             this.host = host;
621         }
622
623         @Override
624         public Void answer(InvocationOnMock args) throws Throwable {
625             int index = 0;
626             CommInfrastructure commType = args.getArgument(index++);
627             String topic = args.getArgument(index++);
628             String event = args.getArgument(index++);
629
630             if (host.beforeOffer(commType, topic, event)) {
631                 return null;
632             }
633
634             boolean result;
635             Object fact = decodeEvent(event);
636
637             if (fact == null) {
638                 result = false;
639                 context.bumpDecodeErrors();
640
641             } else {
642                 result = true;
643
644                 if (!host.beforeInsert(fact)) {
645                     // feature did not handle it so we handle it here
646                     host.afterInsert(fact, result);
647
648                     host.sawMessage();
649                     context.addEvent();
650                 }
651             }
652
653             host.afterOffer(commType, topic, event, result);
654             return null;
655         }
656     }
657
658     /**
659      * Feature with overrides.
660      */
661     private static class PoolingFeatureImpl extends PoolingFeature {
662
663         private final Context context;
664         private final Host host;
665
666         /**
667          * Constructor.
668          *
669          * @param context context
670          */
671         public PoolingFeatureImpl(Context context, Host host) {
672             this.context = context;
673             this.host = host;
674
675             /*
676              * Note: do NOT extract anything from "context" at this point, because it hasn't been
677              * fully initialized yet
678              */
679         }
680
681         @Override
682         public Properties getProperties(String featName) {
683             Properties props = new Properties();
684
685             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
686
687             props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
688             props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
689             props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
690             props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
691             props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
692                     "" + STD_OFFLINE_PUB_WAIT_MS);
693             props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
694                     "" + STD_START_HEARTBEAT_MS);
695             props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
696             props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
697             props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
698                     "" + STD_ACTIVE_HEARTBEAT_MS);
699             props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
700                     "" + STD_INTER_HEARTBEAT_MS);
701
702             props.putAll(makeSinkProperties(INTERNAL_TOPIC));
703             props.putAll(makeSourceProperties(INTERNAL_TOPIC));
704
705             return props;
706         }
707
708         @Override
709         public PolicyController getController(DroolsController droolsController) {
710             return context.getController(droolsController);
711         }
712
713         /**
714          * Embeds a specializer within a property name, after the prefix.
715          *
716          * @param propnm property name into which it should be embedded
717          * @param spec specializer to be embedded
718          * @return the property name, with the specializer embedded within it
719          */
720         private String specialize(String propnm, String spec) {
721             String suffix = propnm.substring(PREFIX.length());
722             return PREFIX + spec + "." + suffix;
723         }
724
725         @Override
726         protected PoolingManagerImpl makeManager(String hostName, PolicyController controller, PoolingProperties props,
727                         CountDownLatch activeLatch) {
728
729             /*
730              * Set this before creating the test, because the test's superclass
731              * constructor uses it before the test object has a chance to store it.
732              */
733             currentHost.set(host);
734
735             return new PoolingManagerTest(hostName, controller, props, activeLatch);
736         }
737     }
738
739     /**
740      * Pooling Manager with overrides.
741      */
742     private static class PoolingManagerTest extends PoolingManagerImpl {
743
744         /**
745          * Constructor.
746          *
747          * @param hostName the host
748          * @param controller the controller
749          * @param props the properties
750          * @param activeLatch the latch
751          */
752         public PoolingManagerTest(String hostName, PolicyController controller,
753                         PoolingProperties props, CountDownLatch activeLatch) {
754
755             super(hostName, controller, props, activeLatch);
756         }
757
758         @Override
759         protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
760             return new DmaapManagerImpl(topic);
761         }
762
763         @Override
764         protected boolean canDecodeEvent(DroolsController drools, String topic) {
765             return true;
766         }
767
768         @Override
769         protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
770             return decodeEvent(event);
771         }
772     }
773
774     /**
775      * DMaaP Manager with overrides.
776      */
777     private static class DmaapManagerImpl extends DmaapManager {
778
779         /**
780          * Constructor.
781          *
782          * @param topic the topic
783          * @throws PoolingFeatureException if an error occurs
784          */
785         public DmaapManagerImpl(String topic) throws PoolingFeatureException {
786             super(topic);
787         }
788
789         @Override
790         protected List<TopicSource> getTopicSources() {
791             Host host = currentHost.get();
792             return Arrays.asList(host.internalSource, host.externalSource);
793         }
794
795         @Override
796         protected List<TopicSink> getTopicSinks() {
797             return Arrays.asList(internalSink, externalSink);
798         }
799     }
800
801     /**
802      * Controller that also implements the {@link TopicListener} interface.
803      */
804     private static interface ListenerController extends PolicyController, TopicListener {
805
806     }
807 }