Remove jackson from feature-pooling-dmaap
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / FeatureTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
30
31 import com.google.gson.Gson;
32 import com.google.gson.JsonParseException;
33 import java.util.Arrays;
34 import java.util.Deque;
35 import java.util.IdentityHashMap;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.TreeMap;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.LinkedBlockingQueue;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicInteger;
48 import java.util.concurrent.atomic.AtomicReference;
49 import org.junit.After;
50 import org.junit.Before;
51 import org.junit.Test;
52 import org.mockito.invocation.InvocationOnMock;
53 import org.mockito.stubbing.Answer;
54 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
55 import org.onap.policy.common.endpoints.event.comm.Topic;
56 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
57 import org.onap.policy.common.endpoints.event.comm.TopicListener;
58 import org.onap.policy.common.endpoints.event.comm.TopicSink;
59 import org.onap.policy.common.endpoints.event.comm.TopicSource;
60 import org.onap.policy.drools.controller.DroolsController;
61 import org.onap.policy.drools.pooling.message.Message;
62 import org.onap.policy.drools.system.PolicyController;
63 import org.onap.policy.drools.system.PolicyEngine;
64 import org.onap.policy.drools.utils.Pair;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 /**
69  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
70  * its own feature object. Uses real feature objects. However, the following are not:
71  * <dl>
72  * <dt>DMaaP sources and sinks</dt>
73  * <dd>simulated using queues. There is one queue for the external topic, and one queue
74  * for each host's internal topic. Messages published to the "admin" channel are simply
75  * sent to all of the hosts' internal topic queues</dd>
76  * <dt>PolicyEngine, PolicyController, DroolsController</dt>
77  * <dd>mocked</dd>
78  * </dl>
79  * 
80  * <p>Invoke {@link #runSlow()}, before the test, to slow things down.
81  */
82 public class FeatureTest {
83
84     private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
85
86     /**
87      * Name of the topic used for inter-host communication.
88      */
89     private static final String INTERNAL_TOPIC = "my.internal.topic";
90
91     /**
92      * Name of the topic from which "external" events "arrive".
93      */
94     private static final String EXTERNAL_TOPIC = "my.external.topic";
95
96     /**
97      * Name of the controller.
98      */
99     private static final String CONTROLLER1 = "controller.one";
100
101     private static long stdReactivateWaitMs = 200;
102     private static long stdIdentificationMs = 60;
103     private static long stdStartHeartbeatMs = 60;
104     private static long stdActiveHeartbeatMs = 50;
105     private static long stdInterHeartbeatMs = 5;
106     private static long stdOfflinePubWaitMs = 2;
107     private static long stdPollMs = 2;
108     private static long stdInterPollMs = 2;
109     private static long stdEventWaitSec = 10;
110     /**
111      * Used to decode events from the external topic.
112      */
113     private static final Gson mapper = new Gson();
114
115     /**
116      * Used to identify the current context.
117      */
118     private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>();
119
120     /**
121      * Context for the current test case.
122      */
123     private Context ctx;
124
125     /**
126      * Setup.
127      */
128     @Before
129     public void setUp() {
130         ctx = null;
131     }
132
133     /**
134      * Tear down.
135      */
136     @After
137     public void tearDown() {
138         if (ctx != null) {
139             ctx.destroy();
140         }
141     }
142
143     @Test
144     public void test_SingleHost() throws Exception {
145         run(70, 1);
146     }
147
148     @Test
149     public void test_TwoHosts() throws Exception {
150         run(200, 2);
151     }
152
153     @Test
154     public void test_ThreeHosts() throws Exception {
155         run(200, 3);
156     }
157
158     private void run(int nmessages, int nhosts) throws Exception {
159         ctx = new Context(nmessages);
160
161         for (int x = 0; x < nhosts; ++x) {
162             ctx.addHost();
163         }
164
165         ctx.startHosts();
166
167         for (int x = 0; x < nmessages; ++x) {
168             ctx.offerExternal(makeMessage(x));
169         }
170
171         ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
172
173         assertEquals(0, ctx.getDecodeErrors());
174         assertEquals(0, ctx.getRemainingEvents());
175         ctx.checkAllSawAMsg();
176     }
177
178     private String makeMessage(int reqnum) {
179         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
180     }
181     
182     /**
183      * Invoke this to slow the timers down.
184      */
185     protected static void runSlow() {
186         stdReactivateWaitMs = 10000;
187         stdIdentificationMs = 10000;
188         stdStartHeartbeatMs = 15000;
189         stdActiveHeartbeatMs = 12000;
190         stdInterHeartbeatMs = 5000;
191         stdOfflinePubWaitMs = 2;
192         stdPollMs = 2;
193         stdInterPollMs = 2000;
194         stdEventWaitSec = 1000;
195     }
196
197     /**
198      * Decodes an event.
199      * 
200      * @param event event
201      * @return the decoded event, or {@code null} if it cannot be decoded
202      */
203     private static Object decodeEvent(String event) {
204         try {
205             return mapper.fromJson(event, TreeMap.class);
206
207         } catch (JsonParseException e) {
208             logger.warn("cannot decode external event", e);
209             return null;
210         }
211     }
212
213     /**
214      * Context used for a single test case.
215      */
216     private static class Context {
217
218         /**
219          * Hosts that have been added to this context.
220          */
221         private final Deque<Host> hosts = new LinkedList<>();
222
223         /**
224          * Maps a drools controller to its policy controller.
225          */
226         private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
227
228         /**
229          * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
230          */
231         private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
232
233         /**
234          * Queue for the external "DMaaP" topic.
235          */
236         private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
237
238         /**
239          * Counts the number of decode errors.
240          */
241         private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
242
243         /**
244          * Number of events we're still waiting to receive.
245          */
246         private final CountDownLatch eventCounter;
247
248         /**
249          * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
250          * {@link #getCurrentHost()}.
251          */
252         private Host currentHost = null;
253
254         /**
255          * Constructor.
256          * 
257          * @param nEvents number of events to be processed
258          */
259         public Context(int events) {
260             eventCounter = new CountDownLatch(events);
261         }
262
263         /**
264          * Destroys the context, stopping any hosts that remain.
265          */
266         public void destroy() {
267             stopHosts();
268             hosts.clear();
269         }
270
271         /**
272          * Creates and adds a new host to the context.
273          * 
274          * @return the new Host
275          */
276         public Host addHost() {
277             Host host = new Host(this);
278             hosts.add(host);
279
280             return host;
281         }
282
283         /**
284          * Starts the hosts.
285          */
286         public void startHosts() {
287             hosts.forEach(host -> host.start());
288         }
289
290         /**
291          * Stops the hosts.
292          */
293         public void stopHosts() {
294             hosts.forEach(host -> host.stop());
295         }
296
297         /**
298          * Verifies that all hosts processed at least one message.
299          */
300         public void checkAllSawAMsg() {
301             int msgs = 0;
302             for (Host host : hosts) {
303                 assertTrue("msgs=" + msgs, host.messageSeen());
304                 ++msgs;
305             }
306         }
307
308         /**
309          * Sets {@link #currentHost} to the specified host, and then invokes the given
310          * function. Resets {@link #currentHost} to {@code null} before returning.
311          * 
312          * @param host host
313          * @param func function to invoke
314          */
315         public void withHost(Host host, VoidFunction func) {
316             currentHost = host;
317             func.apply();
318             currentHost = null;
319         }
320
321         /**
322          * Offers an event to the external topic.
323          * 
324          * @param event event
325          */
326         public void offerExternal(String event) {
327             externalTopic.offer(event);
328         }
329
330         /**
331          * Adds an internal channel to the set of channels.
332          * 
333          * @param channel channel
334          * @param queue the channel's queue
335          */
336         public void addInternal(String channel, BlockingQueue<String> queue) {
337             channel2queue.put(channel, queue);
338         }
339
340         /**
341          * Offers a message to all internal channels.
342          * 
343          * @param message message
344          */
345         public void offerInternal(String message) {
346             channel2queue.values().forEach(queue -> queue.offer(message));
347         }
348
349         /**
350          * Offers amessage to an internal channel.
351          * 
352          * @param channel channel
353          * @param message message
354          */
355         public void offerInternal(String channel, String message) {
356             BlockingQueue<String> queue = channel2queue.get(channel);
357             if (queue != null) {
358                 queue.offer(message);
359             }
360         }
361
362         /**
363          * Associates a controller with its drools controller.
364          * 
365          * @param controller controller
366          * @param droolsController drools controller
367          */
368         public void addController(PolicyController controller, DroolsController droolsController) {
369             drools2policy.put(droolsController, controller);
370         }
371
372         /**
373          * Get controller.
374          * 
375          * @param droolsController drools controller
376          * @return the controller associated with a drools controller, or {@code null} if
377          *         it has no associated controller
378          */
379         public PolicyController getController(DroolsController droolsController) {
380             return drools2policy.get(droolsController);
381         }
382
383         /**
384          * Constructor.
385          * 
386          * @return queue for the external topic
387          */
388         public BlockingQueue<String> getExternalTopic() {
389             return externalTopic;
390         }
391
392         /**
393          * Get decode errors.
394          * 
395          * @return the number of decode errors so far
396          */
397         public int getDecodeErrors() {
398             return numDecodeErrors.get();
399         }
400
401         /**
402          * Increments the count of decode errors.
403          */
404         public void bumpDecodeErrors() {
405             numDecodeErrors.incrementAndGet();
406         }
407
408         /**
409          * Get remaining events.
410          * 
411          * @return the number of events that haven't been processed
412          */
413         public long getRemainingEvents() {
414             return eventCounter.getCount();
415         }
416
417         /**
418          * Adds an event to the counter.
419          */
420         public void addEvent() {
421             eventCounter.countDown();
422         }
423
424         /**
425          * Waits, for a period of time, for all events to be processed.
426          * 
427          * @param time time
428          * @param units units
429          * @return {@code true} if all events have been processed, {@code false} otherwise
430          * @throws InterruptedException throws interrupted
431          */
432         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
433             return eventCounter.await(time, units);
434         }
435
436         /**
437          * Gets the current host, provided this is used from within a call to
438          * {@link #withHost(Host, VoidFunction)}.
439          * 
440          * @return the current host, or {@code null} if there is no current host
441          */
442         public Host getCurrentHost() {
443             return currentHost;
444         }
445     }
446
447     /**
448      * Simulates a single "host".
449      */
450     private static class Host {
451
452         private final Context context;
453
454         private final PoolingFeature feature;
455
456         /**
457          * {@code True} if this host has processed a message, {@code false} otherwise.
458          */
459         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
460
461         /**
462          * This host's internal "DMaaP" topic.
463          */
464         private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
465
466         /**
467          * Source that reads from the external topic and posts to the listener.
468          */
469         private TopicSource externalSource;
470
471         // mock objects
472         private final PolicyEngine engine = mock(PolicyEngine.class);
473         private final ListenerController controller = mock(ListenerController.class);
474         private final DroolsController drools = mock(DroolsController.class);
475
476         /**
477          * Constructor.
478          * 
479          * @param context context
480          */
481         public Host(Context context) {
482             this.context = context;
483
484             when(controller.getName()).thenReturn(CONTROLLER1);
485             when(controller.getDrools()).thenReturn(drools);
486
487             // stop consuming events if the controller stops
488             when(controller.stop()).thenAnswer(args -> {
489                 externalSource.unregister(controller);
490                 return true;
491             });
492
493             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
494
495             context.addController(controller, drools);
496
497             // arrange to read from the external topic
498             externalSource = new TopicSourceImpl(context, false);
499             
500             feature = new PoolingFeatureImpl(context);
501         }
502
503         /**
504          * Get name.
505          * 
506          * @return the host name
507          */
508         public String getName() {
509             return feature.getHost();
510         }
511
512         /**
513          * Starts threads for the host so that it begins consuming from both the external
514          * "DMaaP" topic and its own internal "DMaaP" topic.
515          */
516         public void start() {
517
518             context.withHost(this, () -> {
519
520                 feature.beforeStart(engine);
521                 feature.afterCreate(controller);
522
523                 // assign the queue for this host's internal topic
524                 context.addInternal(getName(), msgQueue);
525
526                 feature.beforeStart(controller);
527
528                 // start consuming events from the external topic
529                 externalSource.register(controller);
530
531                 feature.afterStart(controller);
532             });
533         }
534
535         /**
536          * Stops the host's threads.
537          */
538         public void stop() {
539             feature.beforeStop(controller);
540             externalSource.unregister(controller);
541             feature.afterStop(controller);
542         }
543
544         /**
545          * Offers an event to the feature, before the policy controller handles it.
546          * 
547          * @param protocol protocol
548          * @param topic2 topic
549          * @param event event
550          * @return {@code true} if the event was handled, {@code false} otherwise
551          */
552         public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
553             return feature.beforeOffer(controller, protocol, topic2, event);
554         }
555
556         /**
557          * Offers an event to the feature, after the policy controller handles it.
558          * 
559          * @param protocol protocol
560          * @param topic topic
561          * @param event event
562          * @param success success
563          * @return {@code true} if the event was handled, {@code false} otherwise
564          */
565         public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
566
567             return feature.afterOffer(controller, protocol, topic, event, success);
568         }
569
570         /**
571          * Offers an event to the feature, before the drools controller handles it.
572          * 
573          * @param fact fact
574          * @return {@code true} if the event was handled, {@code false} otherwise
575          */
576         public boolean beforeInsert(Object fact) {
577             return feature.beforeInsert(drools, fact);
578         }
579
580         /**
581          * Offers an event to the feature, after the drools controller handles it.
582          * 
583          * @param fact fact
584          * @param successInsert {@code true} if it was successfully inserted by the drools
585          *        controller, {@code false} otherwise
586          * @return {@code true} if the event was handled, {@code false} otherwise
587          */
588         public boolean afterInsert(Object fact, boolean successInsert) {
589             return feature.afterInsert(drools, fact, successInsert);
590         }
591
592         /**
593          * Indicates that a message was seen for this host.
594          */
595         public void sawMessage() {
596             sawMsg.set(true);
597         }
598
599         /**
600          * Message seen.
601          * 
602          * @return {@code true} if a message was seen for this host, {@code false}
603          *         otherwise
604          */
605         public boolean messageSeen() {
606             return sawMsg.get();
607         }
608
609         /**
610          * Get internal queue.
611          * 
612          * @return the queue associated with this host's internal topic
613          */
614         public BlockingQueue<String> getInternalQueue() {
615             return msgQueue;
616         }
617     }
618
619     /**
620      * Listener for the external topic. Simulates the actions taken by
621      * <i>AggregatedPolicyController.onTopicEvent</i>.
622      */
623     private static class MyExternalTopicListener implements Answer<Void> {
624
625         private final Context context;
626         private final Host host;
627
628         public MyExternalTopicListener(Context context, Host host) {
629             this.context = context;
630             this.host = host;
631         }
632
633         @Override
634         public Void answer(InvocationOnMock args) throws Throwable {
635             int index = 0;
636             CommInfrastructure commType = args.getArgument(index++);
637             String topic = args.getArgument(index++);
638             String event = args.getArgument(index++);
639
640             if (host.beforeOffer(commType, topic, event)) {
641                 return null;
642             }
643
644             boolean result;
645             Object fact = decodeEvent(event);
646
647             if (fact == null) {
648                 result = false;
649                 context.bumpDecodeErrors();
650
651             } else {
652                 result = true;
653
654                 if (!host.beforeInsert(fact)) {
655                     // feature did not handle it so we handle it here
656                     host.afterInsert(fact, result);
657
658                     host.sawMessage();
659                     context.addEvent();
660                 }
661             }
662
663             host.afterOffer(commType, topic, event, result);
664             return null;
665         }
666     }
667
668     /**
669      * Sink implementation that puts a message on the queue specified by the
670      * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
671      * message is placed on all queues.
672      */
673     private static class TopicSinkImpl extends TopicImpl implements TopicSink {
674
675         private final Context context;
676
677         /**
678          * Used to decode the messages so that the channel can be extracted.
679          */
680         private final Serializer serializer = new Serializer();
681
682         /**
683          * Constructor.
684          * 
685          * @param context context
686          */
687         public TopicSinkImpl(Context context) {
688             this.context = context;
689         }
690
691         @Override
692         public synchronized boolean send(String message) {
693             if (!isAlive()) {
694                 return false;
695             }
696
697             try {
698                 Message msg = serializer.decodeMsg(message);
699                 String channel = msg.getChannel();
700
701                 if (Message.ADMIN.equals(channel)) {
702                     // add to every queue
703                     context.offerInternal(message);
704
705                 } else {
706                     // add to a specific queue
707                     context.offerInternal(channel, message);
708                 }
709
710                 return true;
711
712             } catch (JsonParseException e) {
713                 logger.warn("could not decode message: {}", message);
714                 context.bumpDecodeErrors();
715                 return false;
716             }
717         }
718     }
719
720     /**
721      * Source implementation that reads from a queue associated with a topic.
722      */
723     private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
724
725         private final String topic;
726
727         /**
728          * Queue from which to retrieve messages.
729          */
730         private final BlockingQueue<String> queue;
731
732         /**
733          * Manages the current consumer thread. The "first" item is used as a trigger to
734          * tell the thread to stop processing, while the "second" item is triggered <i>by
735          * the thread</i> when it completes.
736          */
737         private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
738
739         /**
740          * Constructor.
741          * 
742          * @param context context
743          * @param internal {@code true} if to read from the internal topic, {@code false}
744          *        to read from the external topic
745          */
746         public TopicSourceImpl(Context context, boolean internal) {
747             if (internal) {
748                 this.topic = INTERNAL_TOPIC;
749                 this.queue = context.getCurrentHost().getInternalQueue();
750
751             } else {
752                 this.topic = EXTERNAL_TOPIC;
753                 this.queue = context.getExternalTopic();
754             }
755         }
756
757         @Override
758         public void setFilter(String filter) {
759             logger.info("topic filter set to: {}", filter);
760         }
761
762         @Override
763         public String getTopic() {
764             return topic;
765         }
766
767         @Override
768         public boolean offer(String event) {
769             throw new UnsupportedOperationException("offer topic source");
770         }
771
772         /**
773          * Starts a thread that takes messages from the queue and gives them to the
774          * listener. Stops the thread of any previously registered listener.
775          */
776         @Override
777         public void register(TopicListener listener) {
778             Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1));
779
780             reregister(newPair);
781
782             Thread thread = new Thread(() -> {
783
784                 try {
785                     do {
786                         processMessages(newPair.first(), listener);
787                     } 
788                     while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
789
790                     logger.info("topic source thread completed");
791
792                 } catch (InterruptedException e) {
793                     logger.warn("topic source thread aborted", e);
794                     Thread.currentThread().interrupt();
795
796                 } catch (RuntimeException e) {
797                     logger.warn("topic source thread aborted", e);
798                 }
799
800                 newPair.second().countDown();
801
802             });
803
804             thread.setDaemon(true);
805             thread.start();
806         }
807
808         /**
809          * Stops the thread of <i>any</i> currently registered listener.
810          */
811         @Override
812         public void unregister(TopicListener listener) {
813             reregister(null);
814         }
815
816         /**
817          * Registers a new "pair" with this source, stopping the consumer associated with
818          * any previous registration.
819          * 
820          * @param newPair the new "pair", or {@code null} to unregister
821          */
822         private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
823             try {
824                 Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
825                 if (oldPair == null) {
826                     if (newPair == null) {
827                         // unregister was invoked twice in a row
828                         logger.warn("re-unregister for topic source");
829                     }
830
831                     // no previous thread to stop
832                     return;
833                 }
834
835                 // need to stop the previous thread
836
837                 // tell it to stop
838                 oldPair.first().countDown();
839
840                 // wait for it to stop
841                 if (!oldPair.second().await(2, TimeUnit.SECONDS)) {
842                     logger.warn("old topic registration is still running");
843                 }
844
845             } catch (InterruptedException e) {
846                 logger.warn("old topic registration may still be running", e);
847                 Thread.currentThread().interrupt();
848             }
849
850             if (newPair != null) {
851                 // register was invoked twice in a row
852                 logger.warn("re-register for topic source");
853             }
854         }
855
856         /**
857          * Polls for messages from the topic and offers them to the listener.
858          * 
859          * @param stopped triggered if processing should stop
860          * @param listener listener
861          * @throws InterruptedException throws interrupted exception
862          */
863         private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
864
865             for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
866
867                 String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
868                 if (msg == null) {
869                     return;
870                 }
871
872                 listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
873             }
874         }
875     }
876
877     /**
878      * Topic implementation. Most methods just throw
879      * {@link UnsupportedOperationException}.
880      */
881     private static class TopicImpl implements Topic {
882
883         /**
884          * Constructor.
885          */
886         public TopicImpl() {
887             super();
888         }
889
890         @Override
891         public String getTopic() {
892             return INTERNAL_TOPIC;
893         }
894
895         @Override
896         public CommInfrastructure getTopicCommInfrastructure() {
897             throw new UnsupportedOperationException("topic protocol");
898         }
899
900         @Override
901         public List<String> getServers() {
902             throw new UnsupportedOperationException("topic servers");
903         }
904
905         @Override
906         public String[] getRecentEvents() {
907             throw new UnsupportedOperationException("topic events");
908         }
909
910         @Override
911         public void register(TopicListener topicListener) {
912             throw new UnsupportedOperationException("register topic");
913         }
914
915         @Override
916         public void unregister(TopicListener topicListener) {
917             throw new UnsupportedOperationException("unregister topic");
918         }
919
920         @Override
921         public synchronized boolean start() {
922             return true;
923         }
924
925         @Override
926         public synchronized boolean stop() {
927             return true;
928         }
929
930         @Override
931         public synchronized void shutdown() {
932             // do nothing
933         }
934
935         @Override
936         public synchronized boolean isAlive() {
937             return true;
938         }
939
940         @Override
941         public boolean lock() {
942             throw new UnsupportedOperationException("lock topicink");
943         }
944
945         @Override
946         public boolean unlock() {
947             throw new UnsupportedOperationException("unlock topic");
948         }
949
950         @Override
951         public boolean isLocked() {
952             throw new UnsupportedOperationException("topic isLocked");
953         }
954     }
955
956     /**
957      * Feature with overrides.
958      */
959     private static class PoolingFeatureImpl extends PoolingFeature {
960
961         private final Context context;
962
963         /**
964          * Constructor.
965          * 
966          * @param context context
967          */
968         public PoolingFeatureImpl(Context context) {
969             this.context = context;
970
971             /*
972              * Note: do NOT extract anything from "context" at this point, because it
973              * hasn't been fully initialized yet
974              */
975         }
976
977         @Override
978         public Properties getProperties(String featName) {
979             Properties props = new Properties();
980
981             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
982
983             props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
984             props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
985             props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
986             props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
987             props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
988                             "" + stdOfflinePubWaitMs);
989             props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
990                             "" + stdStartHeartbeatMs);
991             props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
992             props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
993             props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
994                             "" + stdActiveHeartbeatMs);
995             props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
996                             "" + stdInterHeartbeatMs);
997
998             return props;
999         }
1000
1001         @Override
1002         public PolicyController getController(DroolsController droolsController) {
1003             return context.getController(droolsController);
1004         }
1005
1006         /**
1007          * Embeds a specializer within a property name, after the prefix.
1008          * 
1009          * @param propnm property name into which it should be embedded
1010          * @param spec specializer to be embedded
1011          * @return the property name, with the specializer embedded within it
1012          */
1013         private String specialize(String propnm, String spec) {
1014             String suffix = propnm.substring(PREFIX.length());
1015             return PREFIX + spec + "." + suffix;
1016         }
1017
1018         @Override
1019         protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
1020                         CountDownLatch activeLatch) {
1021
1022             currentContext.set(context);
1023             
1024             return new PoolingManagerTest(host, controller, props, activeLatch);
1025         }
1026     }
1027
1028     /**
1029      * Pooling Manager with overrides.
1030      */
1031     private static class PoolingManagerTest extends PoolingManagerImpl {
1032
1033         /**
1034          * Constructor.
1035          * 
1036          * @param host the host
1037          * @param controller the controller
1038          * @param props the properties
1039          * @param activeLatch the latch
1040          */
1041         public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
1042                         CountDownLatch activeLatch) {
1043
1044             super(host, controller, props, activeLatch);
1045         }
1046
1047         @Override
1048         protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
1049             return new DmaapManagerImpl(topic);
1050         }
1051
1052         @Override
1053         protected boolean canDecodeEvent(DroolsController drools, String topic) {
1054             return true;
1055         }
1056
1057         @Override
1058         protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
1059             return decodeEvent(event);
1060         }
1061     }
1062
1063     /**
1064      * DMaaP Manager with overrides.
1065      */
1066     private static class DmaapManagerImpl extends DmaapManager {
1067
1068         /**
1069          * Constructor.
1070          * 
1071          * @param context this manager's context
1072          * @param topic the topic
1073          * @throws PoolingFeatureException if an error occurs
1074          */
1075         public DmaapManagerImpl(String topic) throws PoolingFeatureException {
1076             super(topic);
1077         }
1078
1079         @Override
1080         protected List<TopicSource> getTopicSources() {
1081             return Arrays.asList(new TopicSourceImpl(currentContext.get(), true));
1082         }
1083
1084         @Override
1085         protected List<TopicSink> getTopicSinks() {
1086             return Arrays.asList(new TopicSinkImpl(currentContext.get()));
1087         }
1088     }
1089
1090     /**
1091      * Controller that also implements the {@link TopicListener} interface.
1092      */
1093     private static interface ListenerController extends PolicyController, TopicListener {
1094
1095     }
1096
1097     /**
1098      * Simple function that takes no arguments and returns nothing.
1099      */
1100     @FunctionalInterface
1101     private static interface VoidFunction {
1102
1103         public void apply();
1104     }
1105 }