Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / EndToEndFeatureTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
30
31 import com.google.gson.Gson;
32 import com.google.gson.JsonParseException;
33 import java.util.Arrays;
34 import java.util.Deque;
35 import java.util.IdentityHashMap;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.TreeMap;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.atomic.AtomicInteger;
44 import org.junit.After;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Ignore;
49 import org.junit.Test;
50 import org.mockito.invocation.InvocationOnMock;
51 import org.mockito.stubbing.Answer;
52 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
53 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
54 import org.onap.policy.common.endpoints.event.comm.TopicListener;
55 import org.onap.policy.common.endpoints.event.comm.TopicSink;
56 import org.onap.policy.common.endpoints.event.comm.TopicSource;
57 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
58 import org.onap.policy.drools.controller.DroolsController;
59 import org.onap.policy.drools.system.PolicyController;
60 import org.onap.policy.drools.system.PolicyEngine;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own
66  * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the
67  * following are not: <dl> <dt>PolicyEngine, PolicyController, DroolsController</dt> <dd>mocked</dd>
68  * </dl>
69  *
70  * <p>The following fields must be set before executing this: <ul> <li>UEB_SERVERS</li>
71  * <li>INTERNAL_TOPIC</li> <li>EXTERNAL_TOPIC</li> </ul>
72  */
73 public class EndToEndFeatureTest {
74
75     private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class);
76
77     /**
78      * UEB servers for both internal & external topics.
79      */
80     private static final String UEB_SERVERS = "";
81
82     /**
83      * Name of the topic used for inter-host communication.
84      */
85     private static final String INTERNAL_TOPIC = "";
86
87     /**
88      * Name of the topic from which "external" events "arrive".
89      */
90     private static final String EXTERNAL_TOPIC = "";
91
92     /**
93      * Consumer group to use when polling the external topic.
94      */
95     private static final String EXTERNAL_GROUP = EndToEndFeatureTest.class.getName();
96
97     /**
98      * Name of the controller.
99      */
100     private static final String CONTROLLER1 = "controller.one";
101
102     /**
103      * Maximum number of items to fetch from DMaaP in a single poll.
104      */
105     private static final String FETCH_LIMIT = "5";
106
107     private static final long STD_REACTIVATE_WAIT_MS = 10000;
108     private static final long STD_IDENTIFICATION_MS = 10000;
109     private static final long STD_START_HEARTBEAT_MS = 15000;
110     private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
111     private static final long STD_INTER_HEARTBEAT_MS = 5000;
112     private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
113     private static final long EVENT_WAIT_SEC = 15;
114
115     /**
116      * Used to decode events from the external topic.
117      */
118     private static final Gson mapper = new Gson();
119
120     /**
121      * Used to identify the current host.
122      */
123     private static final ThreadLocal<Host> currentHost = new ThreadLocal<Host>();
124
125     /**
126      * Sink for external DMaaP topic.
127      */
128     private static TopicSink externalSink;
129
130     /**
131      * Sink for internal DMaaP topic.
132      */
133     private static TopicSink internalSink;
134
135     /**
136      * Context for the current test case.
137      */
138     private Context ctx;
139
140     /**
141      * Setup before class.
142      *
143      */
144     @BeforeClass
145     public static void setUpBeforeClass() {
146         externalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
147         externalSink.start();
148
149         internalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
150         internalSink.start();
151     }
152
153     /**
154      * Tear down after class.
155      *
156      */
157     @AfterClass
158     public static void tearDownAfterClass() {
159         externalSink.stop();
160         internalSink.stop();
161     }
162
163     /**
164      * Setup.
165      */
166     @Before
167     public void setUp() {
168         ctx = null;
169     }
170
171     /**
172      * Tear down.
173      */
174     @After
175     public void tearDown() {
176         if (ctx != null) {
177             ctx.destroy();
178         }
179     }
180
181     /*
182      * This test should only be run manually, after configuring all of the fields,
183      * thus it is ignored.
184      */
185     @Ignore
186     @Test
187     public void test_SingleHost() throws Exception {    // NOSONAR
188         run(70, 1);
189     }
190
191     /*
192      * This test should only be run manually, after configuring all of the fields,
193      * thus it is ignored.
194      */
195     @Ignore
196     @Test
197     public void test_TwoHosts() throws Exception {      // NOSONAR
198         run(200, 2);
199     }
200
201     /*
202      * This test should only be run manually, after configuring all of the fields,
203      * thus it is ignored.
204      */
205     @Ignore
206     @Test
207     public void test_ThreeHosts() throws Exception {    // NOSONAR
208         run(200, 3);
209     }
210
211     private void run(int nmessages, int nhosts) throws Exception {
212         ctx = new Context(nmessages);
213
214         for (int x = 0; x < nhosts; ++x) {
215             ctx.addHost();
216         }
217
218         ctx.startHosts();
219         ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
220
221         for (int x = 0; x < nmessages; ++x) {
222             ctx.offerExternal(makeMessage(x));
223         }
224
225         ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
226
227         assertEquals(0, ctx.getDecodeErrors());
228         assertEquals(0, ctx.getRemainingEvents());
229         ctx.checkAllSawAMsg();
230     }
231
232     private String makeMessage(int reqnum) {
233         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
234     }
235
236     private static Properties makeSinkProperties(String topic) {
237         Properties props = new Properties();
238
239         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic);
240
241         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
242                 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
243         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
244                 + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
245         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
246                 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
247
248         return props;
249     }
250
251     private static Properties makeSourceProperties(String topic) {
252         Properties props = new Properties();
253
254         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
255
256         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
257                 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
258         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
259                 + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
260         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
261                 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
262
263         if (EXTERNAL_TOPIC.equals(topic)) {
264             // consumer group is a constant
265             props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
266                     + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
267
268             // consumer instance is generated by the BusConsumer code
269         }
270
271         // else internal topic: feature populates info for internal topic
272
273         return props;
274     }
275
276     /**
277      * Decodes an event.
278      *
279      * @param event event
280      * @return the decoded event, or {@code null} if it cannot be decoded
281      */
282     private static Object decodeEvent(String event) {
283         try {
284             return mapper.fromJson(event, TreeMap.class);
285
286         } catch (JsonParseException e) {
287             logger.warn("cannot decode external event", e);
288             return null;
289         }
290     }
291
292     /**
293      * Context used for a single test case.
294      */
295     private static class Context {
296
297         /**
298          * Hosts that have been added to this context.
299          */
300         private final Deque<Host> hosts = new LinkedList<>();
301
302         /**
303          * Maps a drools controller to its policy controller.
304          */
305         private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
306
307         /**
308          * Counts the number of decode errors.
309          */
310         private final AtomicInteger decodeErrors = new AtomicInteger(0);
311
312         /**
313          * Number of events we're still waiting to receive.
314          */
315         private final CountDownLatch eventCounter;
316
317         /**
318          * Constructor.
319          *
320          * @param events number of events to be processed
321          */
322         public Context(int events) {
323             eventCounter = new CountDownLatch(events);
324         }
325
326         /**
327          * Destroys the context, stopping any hosts that remain.
328          */
329         public void destroy() {
330             stopHosts();
331             hosts.clear();
332         }
333
334         /**
335          * Creates and adds a new host to the context.
336          *
337          * @return the new Host
338          */
339         public Host addHost() {
340             Host host = new Host(this);
341             hosts.add(host);
342
343             return host;
344         }
345
346         /**
347          * Starts the hosts.
348          */
349         public void startHosts() {
350             hosts.forEach(host -> host.start());
351         }
352
353         /**
354          * Stops the hosts.
355          */
356         public void stopHosts() {
357             hosts.forEach(host -> host.stop());
358         }
359
360         /**
361          * Verifies that all hosts processed at least one message.
362          */
363         public void checkAllSawAMsg() {
364             int msgs = 0;
365             for (Host host : hosts) {
366                 assertTrue("msgs=" + msgs, host.messageSeen());
367                 ++msgs;
368             }
369         }
370
371         /**
372          * Offers an event to the external topic.
373          *
374          * @param event event
375          */
376         public void offerExternal(String event) {
377             externalSink.send(event);
378         }
379
380         /**
381          * Associates a controller with its drools controller.
382          *
383          * @param controller controller
384          * @param droolsController drools controller
385          */
386         public void addController(PolicyController controller, DroolsController droolsController) {
387             drools2policy.put(droolsController, controller);
388         }
389
390         /**
391          * Get controller.
392          *
393          * @param droolsController drools controller
394          * @return the controller associated with a drools controller, or {@code null} if it has no
395          *         associated controller
396          */
397         public PolicyController getController(DroolsController droolsController) {
398             return drools2policy.get(droolsController);
399         }
400
401         /**
402          * Get decode errors.
403          *
404          * @return the number of decode errors so far
405          */
406         public int getDecodeErrors() {
407             return decodeErrors.get();
408         }
409
410         /**
411          * Increments the count of decode errors.
412          */
413         public void bumpDecodeErrors() {
414             decodeErrors.incrementAndGet();
415         }
416
417         /**
418          * Get remaining events.
419          *
420          * @return the number of events that haven't been processed
421          */
422         public long getRemainingEvents() {
423             return eventCounter.getCount();
424         }
425
426         /**
427          * Adds an event to the counter.
428          */
429         public void addEvent() {
430             eventCounter.countDown();
431         }
432
433         /**
434          * Waits, for a period of time, for all events to be processed.
435          *
436          * @param time time
437          * @param units units
438          * @return {@code true} if all events have been processed, {@code false} otherwise
439          * @throws InterruptedException throws interrupted exception
440          */
441         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
442             return eventCounter.await(time, units);
443         }
444
445         /**
446          * Waits, for a period of time, for all hosts to enter the Active state.
447          *
448          * @param timeMs maximum time to wait, in milliseconds
449          * @throws InterruptedException throws interrupted exception
450          */
451         public void awaitAllActive(long timeMs) throws InterruptedException {
452             long tend = timeMs + System.currentTimeMillis();
453
454             for (Host host : hosts) {
455                 long tremain = Math.max(0, tend - System.currentTimeMillis());
456                 assertTrue(host.awaitActive(tremain));
457             }
458         }
459     }
460
461     /**
462      * Simulates a single "host".
463      */
464     private static class Host {
465
466         private final PoolingFeature feature;
467
468         /**
469          * {@code True} if this host has processed a message, {@code false} otherwise.
470          */
471         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
472
473         private final TopicSource externalSource;
474         private final TopicSource internalSource;
475
476         // mock objects
477         private final PolicyEngine engine = mock(PolicyEngine.class);
478         private final ListenerController controller = mock(ListenerController.class);
479         private final DroolsController drools = mock(DroolsController.class);
480
481         /**
482          * Constructor.
483          *
484          * @param context context
485          */
486         public Host(Context context) {
487
488             when(controller.getName()).thenReturn(CONTROLLER1);
489             when(controller.getDrools()).thenReturn(drools);
490
491             externalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(EXTERNAL_TOPIC))
492                             .get(0);
493             internalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(INTERNAL_TOPIC))
494                             .get(0);
495
496             // stop consuming events if the controller stops
497             when(controller.stop()).thenAnswer(args -> {
498                 externalSource.unregister(controller);
499                 return true;
500             });
501
502             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
503
504             context.addController(controller, drools);
505
506             feature = new PoolingFeatureImpl(context, this);
507         }
508
509         /**
510          * Waits, for a period of time, for the host to enter the Active state.
511          *
512          * @param timeMs time to wait, in milliseconds
513          * @return {@code true} if the host entered the Active state within the given amount of
514          *         time, {@code false} otherwise
515          * @throws InterruptedException throws interrupted exception
516          */
517         public boolean awaitActive(long timeMs) throws InterruptedException {
518             return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
519         }
520
521         /**
522          * Starts threads for the host so that it begins consuming from both the external "DMaaP"
523          * topic and its own internal "DMaaP" topic.
524          */
525         public void start() {
526             feature.beforeStart(engine);
527             feature.afterCreate(controller);
528
529             feature.beforeStart(controller);
530
531             // start consuming events from the external topic
532             externalSource.register(controller);
533
534             feature.afterStart(controller);
535         }
536
537         /**
538          * Stops the host's threads.
539          */
540         public void stop() {
541             feature.beforeStop(controller);
542             externalSource.unregister(controller);
543             feature.afterStop(controller);
544         }
545
546         /**
547          * Offers an event to the feature, before the policy controller handles it.
548          *
549          * @param protocol protocol
550          * @param topic2 topic
551          * @param event event
552          * @return {@code true} if the event was handled, {@code false} otherwise
553          */
554         public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
555             return feature.beforeOffer(controller, protocol, topic2, event);
556         }
557
558         /**
559          * Offers an event to the feature, after the policy controller handles it.
560          *
561          * @param protocol protocol
562          * @param topic topic
563          * @param event event
564          * @param success success
565          * @return {@code true} if the event was handled, {@code false} otherwise
566          */
567         public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
568
569             return feature.afterOffer(controller, protocol, topic, event, success);
570         }
571
572         /**
573          * Offers an event to the feature, before the drools controller handles it.
574          *
575          * @param fact fact
576          * @return {@code true} if the event was handled, {@code false} otherwise
577          */
578         public boolean beforeInsert(Object fact) {
579             return feature.beforeInsert(drools, fact);
580         }
581
582         /**
583          * Offers an event to the feature, after the drools controller handles it.
584          *
585          * @param fact fact
586          * @param successInsert {@code true} if it was successfully inserted by the drools
587          *        controller, {@code false} otherwise
588          * @return {@code true} if the event was handled, {@code false} otherwise
589          */
590         public boolean afterInsert(Object fact, boolean successInsert) {
591             return feature.afterInsert(drools, fact, successInsert);
592         }
593
594         /**
595          * Indicates that a message was seen for this host.
596          */
597         public void sawMessage() {
598             sawMsg.set(true);
599         }
600
601         /**
602          * Message seen.
603          *
604          * @return {@code true} if a message was seen for this host, {@code false} otherwise
605          */
606         public boolean messageSeen() {
607             return sawMsg.get();
608         }
609     }
610
611     /**
612      * Listener for the external topic. Simulates the actions taken by
613      * <i>AggregatedPolicyController.onTopicEvent</i>.
614      */
615     private static class MyExternalTopicListener implements Answer<Void> {
616
617         private final Context context;
618         private final Host host;
619
620         public MyExternalTopicListener(Context context, Host host) {
621             this.context = context;
622             this.host = host;
623         }
624
625         @Override
626         public Void answer(InvocationOnMock args) throws Throwable {
627             int index = 0;
628             CommInfrastructure commType = args.getArgument(index++);
629             String topic = args.getArgument(index++);
630             String event = args.getArgument(index++);
631
632             if (host.beforeOffer(commType, topic, event)) {
633                 return null;
634             }
635
636             boolean result;
637             Object fact = decodeEvent(event);
638
639             if (fact == null) {
640                 result = false;
641                 context.bumpDecodeErrors();
642
643             } else {
644                 result = true;
645
646                 if (!host.beforeInsert(fact)) {
647                     // feature did not handle it so we handle it here
648                     host.afterInsert(fact, result);
649
650                     host.sawMessage();
651                     context.addEvent();
652                 }
653             }
654
655             host.afterOffer(commType, topic, event, result);
656             return null;
657         }
658     }
659
660     /**
661      * Feature with overrides.
662      */
663     private static class PoolingFeatureImpl extends PoolingFeature {
664
665         private final Context context;
666         private final Host host;
667
668         /**
669          * Constructor.
670          *
671          * @param context context
672          */
673         public PoolingFeatureImpl(Context context, Host host) {
674             this.context = context;
675             this.host = host;
676
677             /*
678              * Note: do NOT extract anything from "context" at this point, because it hasn't been
679              * fully initialized yet
680              */
681         }
682
683         @Override
684         public Properties getProperties(String featName) {
685             Properties props = new Properties();
686
687             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
688
689             props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
690             props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
691             props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
692             props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
693             props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
694                     "" + STD_OFFLINE_PUB_WAIT_MS);
695             props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
696                     "" + STD_START_HEARTBEAT_MS);
697             props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
698             props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
699             props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
700                     "" + STD_ACTIVE_HEARTBEAT_MS);
701             props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
702                     "" + STD_INTER_HEARTBEAT_MS);
703
704             props.putAll(makeSinkProperties(INTERNAL_TOPIC));
705             props.putAll(makeSourceProperties(INTERNAL_TOPIC));
706
707             return props;
708         }
709
710         @Override
711         public PolicyController getController(DroolsController droolsController) {
712             return context.getController(droolsController);
713         }
714
715         /**
716          * Embeds a specializer within a property name, after the prefix.
717          *
718          * @param propnm property name into which it should be embedded
719          * @param spec specializer to be embedded
720          * @return the property name, with the specializer embedded within it
721          */
722         private String specialize(String propnm, String spec) {
723             String suffix = propnm.substring(PREFIX.length());
724             return PREFIX + spec + "." + suffix;
725         }
726
727         @Override
728         protected PoolingManagerImpl makeManager(String hostName, PolicyController controller, PoolingProperties props,
729                         CountDownLatch activeLatch) {
730
731             /*
732              * Set this before creating the test, because the test's superclass
733              * constructor uses it before the test object has a chance to store it.
734              */
735             currentHost.set(host);
736
737             return new PoolingManagerTest(hostName, controller, props, activeLatch);
738         }
739     }
740
741     /**
742      * Pooling Manager with overrides.
743      */
744     private static class PoolingManagerTest extends PoolingManagerImpl {
745
746         /**
747          * Constructor.
748          *
749          * @param hostName the host
750          * @param controller the controller
751          * @param props the properties
752          * @param activeLatch the latch
753          */
754         public PoolingManagerTest(String hostName, PolicyController controller,
755                         PoolingProperties props, CountDownLatch activeLatch) {
756
757             super(hostName, controller, props, activeLatch);
758         }
759
760         @Override
761         protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
762             return new DmaapManagerImpl(topic);
763         }
764
765         @Override
766         protected boolean canDecodeEvent(DroolsController drools, String topic) {
767             return true;
768         }
769
770         @Override
771         protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
772             return decodeEvent(event);
773         }
774     }
775
776     /**
777      * DMaaP Manager with overrides.
778      */
779     private static class DmaapManagerImpl extends DmaapManager {
780
781         /**
782          * Constructor.
783          *
784          * @param topic the topic
785          * @throws PoolingFeatureException if an error occurs
786          */
787         public DmaapManagerImpl(String topic) throws PoolingFeatureException {
788             super(topic);
789         }
790
791         @Override
792         protected List<TopicSource> getTopicSources() {
793             Host host = currentHost.get();
794             return Arrays.asList(host.internalSource, host.externalSource);
795         }
796
797         @Override
798         protected List<TopicSink> getTopicSinks() {
799             return Arrays.asList(internalSink, externalSink);
800         }
801     }
802
803     /**
804      * Controller that also implements the {@link TopicListener} interface.
805      */
806     private static interface ListenerController extends PolicyController, TopicListener {
807
808     }
809 }