2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
31 import com.google.gson.Gson;
32 import com.google.gson.JsonParseException;
33 import java.util.Arrays;
34 import java.util.Deque;
35 import java.util.IdentityHashMap;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.TreeMap;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.LinkedBlockingQueue;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicInteger;
48 import java.util.concurrent.atomic.AtomicReference;
49 import org.junit.After;
50 import org.junit.Before;
51 import org.junit.Test;
52 import org.mockito.invocation.InvocationOnMock;
53 import org.mockito.stubbing.Answer;
54 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
55 import org.onap.policy.common.endpoints.event.comm.Topic;
56 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
57 import org.onap.policy.common.endpoints.event.comm.TopicListener;
58 import org.onap.policy.common.endpoints.event.comm.TopicSink;
59 import org.onap.policy.common.endpoints.event.comm.TopicSource;
60 import org.onap.policy.drools.controller.DroolsController;
61 import org.onap.policy.drools.pooling.message.Message;
62 import org.onap.policy.drools.system.PolicyController;
63 import org.onap.policy.drools.system.PolicyEngine;
64 import org.onap.policy.drools.utils.Pair;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
69 * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
70 * its own feature object. Uses real feature objects. However, the following are not:
72 * <dt>DMaaP sources and sinks</dt>
73 * <dd>simulated using queues. There is one queue for the external topic, and one queue
74 * for each host's internal topic. Messages published to the "admin" channel are simply
75 * sent to all of the hosts' internal topic queues</dd>
76 * <dt>PolicyEngine, PolicyController, DroolsController</dt>
80 * <p>Invoke {@link #runSlow()}, before the test, to slow things down.
82 public class FeatureTest {
84 private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
87 * Name of the topic used for inter-host communication.
89 private static final String INTERNAL_TOPIC = "my.internal.topic";
92 * Name of the topic from which "external" events "arrive".
94 private static final String EXTERNAL_TOPIC = "my.external.topic";
97 * Name of the controller.
99 private static final String CONTROLLER1 = "controller.one";
101 private static long stdReactivateWaitMs = 200;
102 private static long stdIdentificationMs = 60;
103 private static long stdStartHeartbeatMs = 60;
104 private static long stdActiveHeartbeatMs = 50;
105 private static long stdInterHeartbeatMs = 5;
106 private static long stdOfflinePubWaitMs = 2;
107 private static long stdPollMs = 2;
108 private static long stdInterPollMs = 2;
109 private static long stdEventWaitSec = 10;
111 * Used to decode events from the external topic.
113 private static final Gson mapper = new Gson();
116 * Used to identify the current context.
118 private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>();
121 * Context for the current test case.
129 public void setUp() {
137 public void tearDown() {
144 public void test_SingleHost() throws Exception {
149 public void test_TwoHosts() throws Exception {
154 public void test_ThreeHosts() throws Exception {
158 private void run(int nmessages, int nhosts) throws Exception {
159 ctx = new Context(nmessages);
161 for (int x = 0; x < nhosts; ++x) {
167 for (int x = 0; x < nmessages; ++x) {
168 ctx.offerExternal(makeMessage(x));
171 ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
173 assertEquals(0, ctx.getDecodeErrors());
174 assertEquals(0, ctx.getRemainingEvents());
175 ctx.checkAllSawAMsg();
178 private String makeMessage(int reqnum) {
179 return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
183 * Invoke this to slow the timers down.
185 protected static void runSlow() {
186 stdReactivateWaitMs = 10000;
187 stdIdentificationMs = 10000;
188 stdStartHeartbeatMs = 15000;
189 stdActiveHeartbeatMs = 12000;
190 stdInterHeartbeatMs = 5000;
191 stdOfflinePubWaitMs = 2;
193 stdInterPollMs = 2000;
194 stdEventWaitSec = 1000;
201 * @return the decoded event, or {@code null} if it cannot be decoded
203 private static Object decodeEvent(String event) {
205 return mapper.fromJson(event, TreeMap.class);
207 } catch (JsonParseException e) {
208 logger.warn("cannot decode external event", e);
214 * Context used for a single test case.
216 private static class Context {
219 * Hosts that have been added to this context.
221 private final Deque<Host> hosts = new LinkedList<>();
224 * Maps a drools controller to its policy controller.
226 private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
229 * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
231 private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
234 * Queue for the external "DMaaP" topic.
236 private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
239 * Counts the number of decode errors.
241 private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
244 * Number of events we're still waiting to receive.
246 private final CountDownLatch eventCounter;
249 * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
250 * {@link #getCurrentHost()}.
252 private Host currentHost = null;
257 * @param nEvents number of events to be processed
259 public Context(int events) {
260 eventCounter = new CountDownLatch(events);
264 * Destroys the context, stopping any hosts that remain.
266 public void destroy() {
272 * Creates and adds a new host to the context.
274 * @return the new Host
276 public Host addHost() {
277 Host host = new Host(this);
286 public void startHosts() {
287 hosts.forEach(host -> host.start());
293 public void stopHosts() {
294 hosts.forEach(host -> host.stop());
298 * Verifies that all hosts processed at least one message.
300 public void checkAllSawAMsg() {
302 for (Host host : hosts) {
303 assertTrue("msgs=" + msgs, host.messageSeen());
309 * Sets {@link #currentHost} to the specified host, and then invokes the given
310 * function. Resets {@link #currentHost} to {@code null} before returning.
313 * @param func function to invoke
315 public void withHost(Host host, VoidFunction func) {
322 * Offers an event to the external topic.
326 public void offerExternal(String event) {
327 externalTopic.offer(event);
331 * Adds an internal channel to the set of channels.
333 * @param channel channel
334 * @param queue the channel's queue
336 public void addInternal(String channel, BlockingQueue<String> queue) {
337 channel2queue.put(channel, queue);
341 * Offers a message to all internal channels.
343 * @param message message
345 public void offerInternal(String message) {
346 channel2queue.values().forEach(queue -> queue.offer(message));
350 * Offers amessage to an internal channel.
352 * @param channel channel
353 * @param message message
355 public void offerInternal(String channel, String message) {
356 BlockingQueue<String> queue = channel2queue.get(channel);
358 queue.offer(message);
363 * Associates a controller with its drools controller.
365 * @param controller controller
366 * @param droolsController drools controller
368 public void addController(PolicyController controller, DroolsController droolsController) {
369 drools2policy.put(droolsController, controller);
375 * @param droolsController drools controller
376 * @return the controller associated with a drools controller, or {@code null} if
377 * it has no associated controller
379 public PolicyController getController(DroolsController droolsController) {
380 return drools2policy.get(droolsController);
386 * @return queue for the external topic
388 public BlockingQueue<String> getExternalTopic() {
389 return externalTopic;
395 * @return the number of decode errors so far
397 public int getDecodeErrors() {
398 return numDecodeErrors.get();
402 * Increments the count of decode errors.
404 public void bumpDecodeErrors() {
405 numDecodeErrors.incrementAndGet();
409 * Get remaining events.
411 * @return the number of events that haven't been processed
413 public long getRemainingEvents() {
414 return eventCounter.getCount();
418 * Adds an event to the counter.
420 public void addEvent() {
421 eventCounter.countDown();
425 * Waits, for a period of time, for all events to be processed.
429 * @return {@code true} if all events have been processed, {@code false} otherwise
430 * @throws InterruptedException throws interrupted
432 public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
433 return eventCounter.await(time, units);
437 * Gets the current host, provided this is used from within a call to
438 * {@link #withHost(Host, VoidFunction)}.
440 * @return the current host, or {@code null} if there is no current host
442 public Host getCurrentHost() {
448 * Simulates a single "host".
450 private static class Host {
452 private final Context context;
454 private final PoolingFeature feature;
457 * {@code True} if this host has processed a message, {@code false} otherwise.
459 private final AtomicBoolean sawMsg = new AtomicBoolean(false);
462 * This host's internal "DMaaP" topic.
464 private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
467 * Source that reads from the external topic and posts to the listener.
469 private TopicSource externalSource;
472 private final PolicyEngine engine = mock(PolicyEngine.class);
473 private final ListenerController controller = mock(ListenerController.class);
474 private final DroolsController drools = mock(DroolsController.class);
479 * @param context context
481 public Host(Context context) {
482 this.context = context;
484 when(controller.getName()).thenReturn(CONTROLLER1);
485 when(controller.getDrools()).thenReturn(drools);
487 // stop consuming events if the controller stops
488 when(controller.stop()).thenAnswer(args -> {
489 externalSource.unregister(controller);
493 doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
495 context.addController(controller, drools);
497 // arrange to read from the external topic
498 externalSource = new TopicSourceImpl(context, false);
500 feature = new PoolingFeatureImpl(context);
506 * @return the host name
508 public String getName() {
509 return feature.getHost();
513 * Starts threads for the host so that it begins consuming from both the external
514 * "DMaaP" topic and its own internal "DMaaP" topic.
516 public void start() {
518 context.withHost(this, () -> {
520 feature.beforeStart(engine);
521 feature.afterCreate(controller);
523 // assign the queue for this host's internal topic
524 context.addInternal(getName(), msgQueue);
526 feature.beforeStart(controller);
528 // start consuming events from the external topic
529 externalSource.register(controller);
531 feature.afterStart(controller);
536 * Stops the host's threads.
539 feature.beforeStop(controller);
540 externalSource.unregister(controller);
541 feature.afterStop(controller);
545 * Offers an event to the feature, before the policy controller handles it.
547 * @param protocol protocol
548 * @param topic2 topic
550 * @return {@code true} if the event was handled, {@code false} otherwise
552 public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
553 return feature.beforeOffer(controller, protocol, topic2, event);
557 * Offers an event to the feature, after the policy controller handles it.
559 * @param protocol protocol
562 * @param success success
563 * @return {@code true} if the event was handled, {@code false} otherwise
565 public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
567 return feature.afterOffer(controller, protocol, topic, event, success);
571 * Offers an event to the feature, before the drools controller handles it.
574 * @return {@code true} if the event was handled, {@code false} otherwise
576 public boolean beforeInsert(Object fact) {
577 return feature.beforeInsert(drools, fact);
581 * Offers an event to the feature, after the drools controller handles it.
584 * @param successInsert {@code true} if it was successfully inserted by the drools
585 * controller, {@code false} otherwise
586 * @return {@code true} if the event was handled, {@code false} otherwise
588 public boolean afterInsert(Object fact, boolean successInsert) {
589 return feature.afterInsert(drools, fact, successInsert);
593 * Indicates that a message was seen for this host.
595 public void sawMessage() {
602 * @return {@code true} if a message was seen for this host, {@code false}
605 public boolean messageSeen() {
610 * Get internal queue.
612 * @return the queue associated with this host's internal topic
614 public BlockingQueue<String> getInternalQueue() {
620 * Listener for the external topic. Simulates the actions taken by
621 * <i>AggregatedPolicyController.onTopicEvent</i>.
623 private static class MyExternalTopicListener implements Answer<Void> {
625 private final Context context;
626 private final Host host;
628 public MyExternalTopicListener(Context context, Host host) {
629 this.context = context;
634 public Void answer(InvocationOnMock args) throws Throwable {
636 CommInfrastructure commType = args.getArgument(index++);
637 String topic = args.getArgument(index++);
638 String event = args.getArgument(index++);
640 if (host.beforeOffer(commType, topic, event)) {
645 Object fact = decodeEvent(event);
649 context.bumpDecodeErrors();
654 if (!host.beforeInsert(fact)) {
655 // feature did not handle it so we handle it here
656 host.afterInsert(fact, result);
663 host.afterOffer(commType, topic, event, result);
669 * Sink implementation that puts a message on the queue specified by the
670 * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
671 * message is placed on all queues.
673 private static class TopicSinkImpl extends TopicImpl implements TopicSink {
675 private final Context context;
678 * Used to decode the messages so that the channel can be extracted.
680 private final Serializer serializer = new Serializer();
685 * @param context context
687 public TopicSinkImpl(Context context) {
688 this.context = context;
692 public synchronized boolean send(String message) {
698 Message msg = serializer.decodeMsg(message);
699 String channel = msg.getChannel();
701 if (Message.ADMIN.equals(channel)) {
702 // add to every queue
703 context.offerInternal(message);
706 // add to a specific queue
707 context.offerInternal(channel, message);
712 } catch (JsonParseException e) {
713 logger.warn("could not decode message: {}", message);
714 context.bumpDecodeErrors();
721 * Source implementation that reads from a queue associated with a topic.
723 private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
725 private final String topic;
728 * Queue from which to retrieve messages.
730 private final BlockingQueue<String> queue;
733 * Manages the current consumer thread. The "first" item is used as a trigger to
734 * tell the thread to stop processing, while the "second" item is triggered <i>by
735 * the thread</i> when it completes.
737 private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
742 * @param context context
743 * @param internal {@code true} if to read from the internal topic, {@code false}
744 * to read from the external topic
746 public TopicSourceImpl(Context context, boolean internal) {
748 this.topic = INTERNAL_TOPIC;
749 this.queue = context.getCurrentHost().getInternalQueue();
752 this.topic = EXTERNAL_TOPIC;
753 this.queue = context.getExternalTopic();
758 public void setFilter(String filter) {
759 logger.info("topic filter set to: {}", filter);
763 public String getTopic() {
768 public boolean offer(String event) {
769 throw new UnsupportedOperationException("offer topic source");
773 * Starts a thread that takes messages from the queue and gives them to the
774 * listener. Stops the thread of any previously registered listener.
777 public void register(TopicListener listener) {
778 Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1));
782 Thread thread = new Thread(() -> {
786 processMessages(newPair.first(), listener);
788 while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
790 logger.info("topic source thread completed");
792 } catch (InterruptedException e) {
793 logger.warn("topic source thread aborted", e);
794 Thread.currentThread().interrupt();
796 } catch (RuntimeException e) {
797 logger.warn("topic source thread aborted", e);
800 newPair.second().countDown();
804 thread.setDaemon(true);
809 * Stops the thread of <i>any</i> currently registered listener.
812 public void unregister(TopicListener listener) {
817 * Registers a new "pair" with this source, stopping the consumer associated with
818 * any previous registration.
820 * @param newPair the new "pair", or {@code null} to unregister
822 private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
824 Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
825 if (oldPair == null) {
826 if (newPair == null) {
827 // unregister was invoked twice in a row
828 logger.warn("re-unregister for topic source");
831 // no previous thread to stop
835 // need to stop the previous thread
838 oldPair.first().countDown();
840 // wait for it to stop
841 if (!oldPair.second().await(2, TimeUnit.SECONDS)) {
842 logger.warn("old topic registration is still running");
845 } catch (InterruptedException e) {
846 logger.warn("old topic registration may still be running", e);
847 Thread.currentThread().interrupt();
850 if (newPair != null) {
851 // register was invoked twice in a row
852 logger.warn("re-register for topic source");
857 * Polls for messages from the topic and offers them to the listener.
859 * @param stopped triggered if processing should stop
860 * @param listener listener
861 * @throws InterruptedException throws interrupted exception
863 private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
865 for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
867 String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
872 listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
878 * Topic implementation. Most methods just throw
879 * {@link UnsupportedOperationException}.
881 private static class TopicImpl implements Topic {
891 public String getTopic() {
892 return INTERNAL_TOPIC;
896 public CommInfrastructure getTopicCommInfrastructure() {
897 throw new UnsupportedOperationException("topic protocol");
901 public List<String> getServers() {
902 throw new UnsupportedOperationException("topic servers");
906 public String[] getRecentEvents() {
907 throw new UnsupportedOperationException("topic events");
911 public void register(TopicListener topicListener) {
912 throw new UnsupportedOperationException("register topic");
916 public void unregister(TopicListener topicListener) {
917 throw new UnsupportedOperationException("unregister topic");
921 public synchronized boolean start() {
926 public synchronized boolean stop() {
931 public synchronized void shutdown() {
936 public synchronized boolean isAlive() {
941 public boolean lock() {
942 throw new UnsupportedOperationException("lock topicink");
946 public boolean unlock() {
947 throw new UnsupportedOperationException("unlock topic");
951 public boolean isLocked() {
952 throw new UnsupportedOperationException("topic isLocked");
957 * Feature with overrides.
959 private static class PoolingFeatureImpl extends PoolingFeature {
961 private final Context context;
966 * @param context context
968 public PoolingFeatureImpl(Context context) {
969 this.context = context;
972 * Note: do NOT extract anything from "context" at this point, because it
973 * hasn't been fully initialized yet
978 public Properties getProperties(String featName) {
979 Properties props = new Properties();
981 props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
983 props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
984 props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
985 props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
986 props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
987 props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
988 "" + stdOfflinePubWaitMs);
989 props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
990 "" + stdStartHeartbeatMs);
991 props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
992 props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
993 props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
994 "" + stdActiveHeartbeatMs);
995 props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
996 "" + stdInterHeartbeatMs);
1002 public PolicyController getController(DroolsController droolsController) {
1003 return context.getController(droolsController);
1007 * Embeds a specializer within a property name, after the prefix.
1009 * @param propnm property name into which it should be embedded
1010 * @param spec specializer to be embedded
1011 * @return the property name, with the specializer embedded within it
1013 private String specialize(String propnm, String spec) {
1014 String suffix = propnm.substring(PREFIX.length());
1015 return PREFIX + spec + "." + suffix;
1019 protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
1020 CountDownLatch activeLatch) {
1022 currentContext.set(context);
1024 return new PoolingManagerTest(host, controller, props, activeLatch);
1029 * Pooling Manager with overrides.
1031 private static class PoolingManagerTest extends PoolingManagerImpl {
1036 * @param host the host
1037 * @param controller the controller
1038 * @param props the properties
1039 * @param activeLatch the latch
1041 public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
1042 CountDownLatch activeLatch) {
1044 super(host, controller, props, activeLatch);
1048 protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
1049 return new DmaapManagerImpl(topic);
1053 protected boolean canDecodeEvent(DroolsController drools, String topic) {
1058 protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
1059 return decodeEvent(event);
1064 * DMaaP Manager with overrides.
1066 private static class DmaapManagerImpl extends DmaapManager {
1071 * @param context this manager's context
1072 * @param topic the topic
1073 * @throws PoolingFeatureException if an error occurs
1075 public DmaapManagerImpl(String topic) throws PoolingFeatureException {
1080 protected List<TopicSource> getTopicSources() {
1081 return Arrays.asList(new TopicSourceImpl(currentContext.get(), true));
1085 protected List<TopicSink> getTopicSinks() {
1086 return Arrays.asList(new TopicSinkImpl(currentContext.get()));
1091 * Controller that also implements the {@link TopicListener} interface.
1093 private static interface ListenerController extends PolicyController, TopicListener {
1098 * Simple function that takes no arguments and returns nothing.
1100 @FunctionalInterface
1101 private static interface VoidFunction {
1103 public void apply();