Remove jackson from feature-pooling-dmaap
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / EndToEndFeatureTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
30
31 import com.google.gson.Gson;
32 import com.google.gson.JsonParseException;
33 import java.util.Arrays;
34 import java.util.Deque;
35 import java.util.IdentityHashMap;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.TreeMap;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.atomic.AtomicInteger;
44 import org.junit.After;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Ignore;
49 import org.junit.Test;
50 import org.mockito.invocation.InvocationOnMock;
51 import org.mockito.stubbing.Answer;
52 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
53 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
54 import org.onap.policy.common.endpoints.event.comm.TopicListener;
55 import org.onap.policy.common.endpoints.event.comm.TopicSink;
56 import org.onap.policy.common.endpoints.event.comm.TopicSource;
57 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
58 import org.onap.policy.drools.controller.DroolsController;
59 import org.onap.policy.drools.system.PolicyController;
60 import org.onap.policy.drools.system.PolicyEngine;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own
66  * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the
67  * following are not: <dl> <dt>PolicyEngine, PolicyController, DroolsController</dt> <dd>mocked</dd>
68  * </dl>
69  *
70  * <p>The following fields must be set before executing this: <ul> <li>UEB_SERVERS</li>
71  * <li>INTERNAL_TOPIC</li> <li>EXTERNAL_TOPIC</li> </ul>
72  */
73 public class EndToEndFeatureTest {
74
75     private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class);
76
77     /**
78      * UEB servers for both internal & external topics.
79      */
80     private static final String UEB_SERVERS = "";
81
82     /**
83      * Name of the topic used for inter-host communication.
84      */
85     private static final String INTERNAL_TOPIC = "";
86
87     /**
88      * Name of the topic from which "external" events "arrive".
89      */
90     private static final String EXTERNAL_TOPIC = "";
91
92     /**
93      * Consumer group to use when polling the external topic.
94      */
95     private static final String EXTERNAL_GROUP = EndToEndFeatureTest.class.getName();
96
97     /**
98      * Name of the controller.
99      */
100     private static final String CONTROLLER1 = "controller.one";
101
102     /**
103      * Maximum number of items to fetch from DMaaP in a single poll.
104      */
105     private static final String FETCH_LIMIT = "5";
106
107     private static final long STD_REACTIVATE_WAIT_MS = 10000;
108     private static final long STD_IDENTIFICATION_MS = 10000;
109     private static final long STD_START_HEARTBEAT_MS = 15000;
110     private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
111     private static final long STD_INTER_HEARTBEAT_MS = 5000;
112     private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
113     private static final long EVENT_WAIT_SEC = 15;
114
115     /**
116      * Used to decode events from the external topic.
117      */
118     private static final Gson mapper = new Gson();
119
120     /**
121      * Used to identify the current host.
122      */
123     private static final ThreadLocal<Host> currentHost = new ThreadLocal<Host>();
124
125     /**
126      * Sink for external DMaaP topic.
127      */
128     private static TopicSink externalSink;
129
130     /**
131      * Sink for internal DMaaP topic.
132      */
133     private static TopicSink internalSink;
134
135     /**
136      * Context for the current test case.
137      */
138     private Context ctx;
139
140     /**
141      * Setup before class.
142      *
143      */
144     @BeforeClass
145     public static void setUpBeforeClass() {
146         externalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
147         externalSink.start();
148
149         internalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
150         internalSink.start();
151     }
152
153     /**
154      * Tear down after class.
155      *
156      */
157     @AfterClass
158     public static void tearDownAfterClass() {
159         externalSink.stop();
160         internalSink.stop();
161     }
162
163     /**
164      * Setup.
165      */
166     @Before
167     public void setUp() {
168         ctx = null;
169     }
170
171     /**
172      * Tear down.
173      */
174     @After
175     public void tearDown() {
176         if (ctx != null) {
177             ctx.destroy();
178         }
179     }
180
181     @Ignore
182     @Test
183     public void test_SingleHost() throws Exception {
184         run(70, 1);
185     }
186
187     @Ignore
188     @Test
189     public void test_TwoHosts() throws Exception {
190         run(200, 2);
191     }
192
193     @Ignore
194     @Test
195     public void test_ThreeHosts() throws Exception {
196         run(200, 3);
197     }
198
199     private void run(int nmessages, int nhosts) throws Exception {
200         ctx = new Context(nmessages);
201
202         for (int x = 0; x < nhosts; ++x) {
203             ctx.addHost();
204         }
205
206         ctx.startHosts();
207         ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
208
209         for (int x = 0; x < nmessages; ++x) {
210             ctx.offerExternal(makeMessage(x));
211         }
212
213         ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
214
215         assertEquals(0, ctx.getDecodeErrors());
216         assertEquals(0, ctx.getRemainingEvents());
217         ctx.checkAllSawAMsg();
218     }
219
220     private String makeMessage(int reqnum) {
221         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
222     }
223
224     private static Properties makeSinkProperties(String topic) {
225         Properties props = new Properties();
226
227         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic);
228
229         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
230                 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
231         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
232                 + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
233         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
234                 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
235
236         return props;
237     }
238
239     private static Properties makeSourceProperties(String topic) {
240         Properties props = new Properties();
241
242         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
243
244         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
245                 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
246         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
247                 + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
248         props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
249                 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
250
251         if (EXTERNAL_TOPIC.equals(topic)) {
252             // consumer group is a constant
253             props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
254                     + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
255
256             // consumer instance is generated by the BusConsumer code
257         }
258
259         // else internal topic: feature populates info for internal topic
260
261         return props;
262     }
263
264     /**
265      * Decodes an event.
266      *
267      * @param event event
268      * @return the decoded event, or {@code null} if it cannot be decoded
269      */
270     private static Object decodeEvent(String event) {
271         try {
272             return mapper.fromJson(event, TreeMap.class);
273
274         } catch (JsonParseException e) {
275             logger.warn("cannot decode external event", e);
276             return null;
277         }
278     }
279
280     /**
281      * Context used for a single test case.
282      */
283     private static class Context {
284
285         /**
286          * Hosts that have been added to this context.
287          */
288         private final Deque<Host> hosts = new LinkedList<>();
289
290         /**
291          * Maps a drools controller to its policy controller.
292          */
293         private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
294
295         /**
296          * Counts the number of decode errors.
297          */
298         private final AtomicInteger decodeErrors = new AtomicInteger(0);
299
300         /**
301          * Number of events we're still waiting to receive.
302          */
303         private final CountDownLatch eventCounter;
304
305         /**
306          * Constructor.
307          *
308          * @param nEvents number of events to be processed
309          */
310         public Context(int events) {
311             eventCounter = new CountDownLatch(events);
312         }
313
314         /**
315          * Destroys the context, stopping any hosts that remain.
316          */
317         public void destroy() {
318             stopHosts();
319             hosts.clear();
320         }
321
322         /**
323          * Creates and adds a new host to the context.
324          *
325          * @return the new Host
326          */
327         public Host addHost() {
328             Host host = new Host(this);
329             hosts.add(host);
330
331             return host;
332         }
333
334         /**
335          * Starts the hosts.
336          */
337         public void startHosts() {
338             hosts.forEach(host -> host.start());
339         }
340
341         /**
342          * Stops the hosts.
343          */
344         public void stopHosts() {
345             hosts.forEach(host -> host.stop());
346         }
347
348         /**
349          * Verifies that all hosts processed at least one message.
350          */
351         public void checkAllSawAMsg() {
352             int msgs = 0;
353             for (Host host : hosts) {
354                 assertTrue("msgs=" + msgs, host.messageSeen());
355                 ++msgs;
356             }
357         }
358
359         /**
360          * Offers an event to the external topic.
361          *
362          * @param event event
363          */
364         public void offerExternal(String event) {
365             externalSink.send(event);
366         }
367
368         /**
369          * Associates a controller with its drools controller.
370          *
371          * @param controller controller
372          * @param droolsController drools controller
373          */
374         public void addController(PolicyController controller, DroolsController droolsController) {
375             drools2policy.put(droolsController, controller);
376         }
377
378         /**
379          * Get controller.
380          *
381          * @param droolsController drools controller
382          * @return the controller associated with a drools controller, or {@code null} if it has no
383          *         associated controller
384          */
385         public PolicyController getController(DroolsController droolsController) {
386             return drools2policy.get(droolsController);
387         }
388
389         /**
390          * Get decode errors.
391          *
392          * @return the number of decode errors so far
393          */
394         public int getDecodeErrors() {
395             return decodeErrors.get();
396         }
397
398         /**
399          * Increments the count of decode errors.
400          */
401         public void bumpDecodeErrors() {
402             decodeErrors.incrementAndGet();
403         }
404
405         /**
406          * Get remaining events.
407          *
408          * @return the number of events that haven't been processed
409          */
410         public long getRemainingEvents() {
411             return eventCounter.getCount();
412         }
413
414         /**
415          * Adds an event to the counter.
416          */
417         public void addEvent() {
418             eventCounter.countDown();
419         }
420
421         /**
422          * Waits, for a period of time, for all events to be processed.
423          *
424          * @param time time
425          * @param units units
426          * @return {@code true} if all events have been processed, {@code false} otherwise
427          * @throws InterruptedException throws interrupted exception
428          */
429         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
430             return eventCounter.await(time, units);
431         }
432
433         /**
434          * Waits, for a period of time, for all hosts to enter the Active state.
435          *
436          * @param timeMs maximum time to wait, in milliseconds
437          * @throws InterruptedException throws interrupted exception
438          */
439         public void awaitAllActive(long timeMs) throws InterruptedException {
440             long tend = timeMs + System.currentTimeMillis();
441
442             for (Host host : hosts) {
443                 long tremain = Math.max(0, tend - System.currentTimeMillis());
444                 assertTrue(host.awaitActive(tremain));
445             }
446         }
447     }
448
449     /**
450      * Simulates a single "host".
451      */
452     private static class Host {
453
454         private final PoolingFeature feature;
455
456         /**
457          * {@code True} if this host has processed a message, {@code false} otherwise.
458          */
459         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
460
461         private final TopicSource externalSource;
462         private final TopicSource internalSource;
463
464         // mock objects
465         private final PolicyEngine engine = mock(PolicyEngine.class);
466         private final ListenerController controller = mock(ListenerController.class);
467         private final DroolsController drools = mock(DroolsController.class);
468
469         /**
470          * Constructor.
471          *
472          * @param context context
473          */
474         public Host(Context context) {
475
476             when(controller.getName()).thenReturn(CONTROLLER1);
477             when(controller.getDrools()).thenReturn(drools);
478
479             externalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
480             internalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
481
482             // stop consuming events if the controller stops
483             when(controller.stop()).thenAnswer(args -> {
484                 externalSource.unregister(controller);
485                 return true;
486             });
487
488             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
489
490             context.addController(controller, drools);
491
492             feature = new PoolingFeatureImpl(context, this);
493         }
494
495         /**
496          * Waits, for a period of time, for the host to enter the Active state.
497          *
498          * @param timeMs time to wait, in milliseconds
499          * @return {@code true} if the host entered the Active state within the given amount of
500          *         time, {@code false} otherwise
501          * @throws InterruptedException throws interrupted exception
502          */
503         public boolean awaitActive(long timeMs) throws InterruptedException {
504             return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
505         }
506
507         /**
508          * Starts threads for the host so that it begins consuming from both the external "DMaaP"
509          * topic and its own internal "DMaaP" topic.
510          */
511         public void start() {
512             feature.beforeStart(engine);
513             feature.afterCreate(controller);
514
515             feature.beforeStart(controller);
516
517             // start consuming events from the external topic
518             externalSource.register(controller);
519
520             feature.afterStart(controller);
521         }
522
523         /**
524          * Stops the host's threads.
525          */
526         public void stop() {
527             feature.beforeStop(controller);
528             externalSource.unregister(controller);
529             feature.afterStop(controller);
530         }
531
532         /**
533          * Offers an event to the feature, before the policy controller handles it.
534          *
535          * @param protocol protocol
536          * @param topic2 topic
537          * @param event event
538          * @return {@code true} if the event was handled, {@code false} otherwise
539          */
540         public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
541             return feature.beforeOffer(controller, protocol, topic2, event);
542         }
543
544         /**
545          * Offers an event to the feature, after the policy controller handles it.
546          *
547          * @param protocol protocol
548          * @param topic topic
549          * @param event event
550          * @param success success
551          * @return {@code true} if the event was handled, {@code false} otherwise
552          */
553         public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
554
555             return feature.afterOffer(controller, protocol, topic, event, success);
556         }
557
558         /**
559          * Offers an event to the feature, before the drools controller handles it.
560          *
561          * @param fact fact
562          * @return {@code true} if the event was handled, {@code false} otherwise
563          */
564         public boolean beforeInsert(Object fact) {
565             return feature.beforeInsert(drools, fact);
566         }
567
568         /**
569          * Offers an event to the feature, after the drools controller handles it.
570          *
571          * @param fact fact
572          * @param successInsert {@code true} if it was successfully inserted by the drools
573          *        controller, {@code false} otherwise
574          * @return {@code true} if the event was handled, {@code false} otherwise
575          */
576         public boolean afterInsert(Object fact, boolean successInsert) {
577             return feature.afterInsert(drools, fact, successInsert);
578         }
579
580         /**
581          * Indicates that a message was seen for this host.
582          */
583         public void sawMessage() {
584             sawMsg.set(true);
585         }
586
587         /**
588          * Message seen.
589          *
590          * @return {@code true} if a message was seen for this host, {@code false} otherwise
591          */
592         public boolean messageSeen() {
593             return sawMsg.get();
594         }
595     }
596
597     /**
598      * Listener for the external topic. Simulates the actions taken by
599      * <i>AggregatedPolicyController.onTopicEvent</i>.
600      */
601     private static class MyExternalTopicListener implements Answer<Void> {
602
603         private final Context context;
604         private final Host host;
605
606         public MyExternalTopicListener(Context context, Host host) {
607             this.context = context;
608             this.host = host;
609         }
610
611         @Override
612         public Void answer(InvocationOnMock args) throws Throwable {
613             int index = 0;
614             CommInfrastructure commType = args.getArgument(index++);
615             String topic = args.getArgument(index++);
616             String event = args.getArgument(index++);
617
618             if (host.beforeOffer(commType, topic, event)) {
619                 return null;
620             }
621
622             boolean result;
623             Object fact = decodeEvent(event);
624
625             if (fact == null) {
626                 result = false;
627                 context.bumpDecodeErrors();
628
629             } else {
630                 result = true;
631
632                 if (!host.beforeInsert(fact)) {
633                     // feature did not handle it so we handle it here
634                     host.afterInsert(fact, result);
635
636                     host.sawMessage();
637                     context.addEvent();
638                 }
639             }
640
641             host.afterOffer(commType, topic, event, result);
642             return null;
643         }
644     }
645
646     /**
647      * Feature with overrides.
648      */
649     private static class PoolingFeatureImpl extends PoolingFeature {
650
651         private final Context context;
652         private final Host host;
653
654         /**
655          * Constructor.
656          *
657          * @param context context
658          */
659         public PoolingFeatureImpl(Context context, Host host) {
660             this.context = context;
661             this.host = host;
662
663             /*
664              * Note: do NOT extract anything from "context" at this point, because it hasn't been
665              * fully initialized yet
666              */
667         }
668
669         @Override
670         public Properties getProperties(String featName) {
671             Properties props = new Properties();
672
673             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
674
675             props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
676             props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
677             props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
678             props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
679             props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
680                     "" + STD_OFFLINE_PUB_WAIT_MS);
681             props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
682                     "" + STD_START_HEARTBEAT_MS);
683             props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
684             props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
685             props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
686                     "" + STD_ACTIVE_HEARTBEAT_MS);
687             props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
688                     "" + STD_INTER_HEARTBEAT_MS);
689
690             props.putAll(makeSinkProperties(INTERNAL_TOPIC));
691             props.putAll(makeSourceProperties(INTERNAL_TOPIC));
692
693             return props;
694         }
695
696         @Override
697         public PolicyController getController(DroolsController droolsController) {
698             return context.getController(droolsController);
699         }
700
701         /**
702          * Embeds a specializer within a property name, after the prefix.
703          *
704          * @param propnm property name into which it should be embedded
705          * @param spec specializer to be embedded
706          * @return the property name, with the specializer embedded within it
707          */
708         private String specialize(String propnm, String spec) {
709             String suffix = propnm.substring(PREFIX.length());
710             return PREFIX + spec + "." + suffix;
711         }
712
713         @Override
714         protected PoolingManagerImpl makeManager(String hostName, PolicyController controller, PoolingProperties props,
715                         CountDownLatch activeLatch) {
716
717             /*
718              * Set this before creating the test, because the test's superclass
719              * constructor uses it before the test object has a chance to store it.
720              */
721             currentHost.set(host);
722
723             return new PoolingManagerTest(hostName, controller, props, activeLatch);
724         }
725     }
726
727     /**
728      * Pooling Manager with overrides.
729      */
730     private static class PoolingManagerTest extends PoolingManagerImpl {
731
732         /**
733          * Constructor.
734          *
735          * @param hostName the host
736          * @param controller the controller
737          * @param props the properties
738          * @param activeLatch the latch
739          */
740         public PoolingManagerTest(String hostName, PolicyController controller,
741                         PoolingProperties props, CountDownLatch activeLatch) {
742
743             super(hostName, controller, props, activeLatch);
744         }
745
746         @Override
747         protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
748             return new DmaapManagerImpl(topic);
749         }
750
751         @Override
752         protected boolean canDecodeEvent(DroolsController drools, String topic) {
753             return true;
754         }
755
756         @Override
757         protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
758             return decodeEvent(event);
759         }
760     }
761
762     /**
763      * DMaaP Manager with overrides.
764      */
765     private static class DmaapManagerImpl extends DmaapManager {
766
767         /**
768          * Constructor.
769          *
770          * @param topic the topic
771          * @throws PoolingFeatureException if an error occurs
772          */
773         public DmaapManagerImpl(String topic) throws PoolingFeatureException {
774             super(topic);
775         }
776
777         @Override
778         protected List<TopicSource> getTopicSources() {
779             Host host = currentHost.get();
780             return Arrays.asList(host.internalSource, host.externalSource);
781         }
782
783         @Override
784         protected List<TopicSink> getTopicSinks() {
785             return Arrays.asList(internalSink, externalSink);
786         }
787     }
788
789     /**
790      * Controller that also implements the {@link TopicListener} interface.
791      */
792     private static interface ListenerController extends PolicyController, TopicListener {
793
794     }
795 }