2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
31 import com.fasterxml.jackson.core.type.TypeReference;
32 import com.fasterxml.jackson.databind.ObjectMapper;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Deque;
36 import java.util.IdentityHashMap;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Properties;
40 import java.util.TreeMap;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentMap;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.LinkedBlockingQueue;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.concurrent.atomic.AtomicReference;
50 import org.junit.After;
51 import org.junit.Before;
52 import org.junit.Test;
53 import org.mockito.invocation.InvocationOnMock;
54 import org.mockito.stubbing.Answer;
55 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
56 import org.onap.policy.common.endpoints.event.comm.Topic;
57 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
58 import org.onap.policy.common.endpoints.event.comm.TopicListener;
59 import org.onap.policy.common.endpoints.event.comm.TopicSink;
60 import org.onap.policy.common.endpoints.event.comm.TopicSource;
61 import org.onap.policy.drools.controller.DroolsController;
62 import org.onap.policy.drools.pooling.message.Message;
63 import org.onap.policy.drools.system.PolicyController;
64 import org.onap.policy.drools.system.PolicyEngine;
65 import org.onap.policy.drools.utils.Pair;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
70 * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
71 * its own feature object. Uses real feature objects. However, the following are not:
73 * <dt>DMaaP sources and sinks</dt>
74 * <dd>simulated using queues. There is one queue for the external topic, and one queue
75 * for each host's internal topic. Messages published to the "admin" channel are simply
76 * sent to all of the hosts' internal topic queues</dd>
77 * <dt>PolicyEngine, PolicyController, DroolsController</dt>
81 * <p>Invoke {@link #runSlow()}, before the test, to slow things down.
83 public class FeatureTest {
85 private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
88 * Name of the topic used for inter-host communication.
90 private static final String INTERNAL_TOPIC = "my.internal.topic";
93 * Name of the topic from which "external" events "arrive".
95 private static final String EXTERNAL_TOPIC = "my.external.topic";
98 * Name of the controller.
100 private static final String CONTROLLER1 = "controller.one";
102 private static long stdReactivateWaitMs = 200;
103 private static long stdIdentificationMs = 60;
104 private static long stdStartHeartbeatMs = 60;
105 private static long stdActiveHeartbeatMs = 50;
106 private static long stdInterHeartbeatMs = 5;
107 private static long stdOfflinePubWaitMs = 2;
108 private static long stdPollMs = 2;
109 private static long stdInterPollMs = 2;
110 private static long stdEventWaitSec = 10;
113 * Used to decode events into a Map.
115 private static final TypeReference<TreeMap<String, String>> typeRef =
116 new TypeReference<TreeMap<String, String>>() {};
119 * Used to decode events from the external topic.
121 private static final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
123 protected ObjectMapper initialValue() {
124 return new ObjectMapper();
129 * Used to identify the current context.
131 private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>();
134 * Context for the current test case.
142 public void setUp() {
150 public void tearDown() {
157 public void test_SingleHost() throws Exception {
162 public void test_TwoHosts() throws Exception {
167 public void test_ThreeHosts() throws Exception {
171 private void run(int nmessages, int nhosts) throws Exception {
172 ctx = new Context(nmessages);
174 for (int x = 0; x < nhosts; ++x) {
180 for (int x = 0; x < nmessages; ++x) {
181 ctx.offerExternal(makeMessage(x));
184 ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
186 assertEquals(0, ctx.getDecodeErrors());
187 assertEquals(0, ctx.getRemainingEvents());
188 ctx.checkAllSawAMsg();
191 private String makeMessage(int reqnum) {
192 return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
196 * Invoke this to slow the timers down.
198 protected static void runSlow() {
199 stdReactivateWaitMs = 10000;
200 stdIdentificationMs = 10000;
201 stdStartHeartbeatMs = 15000;
202 stdActiveHeartbeatMs = 12000;
203 stdInterHeartbeatMs = 5000;
204 stdOfflinePubWaitMs = 2;
206 stdInterPollMs = 2000;
207 stdEventWaitSec = 1000;
214 * @return the decoded event, or {@code null} if it cannot be decoded
216 private static Object decodeEvent(String event) {
218 return mapper.get().readValue(event, typeRef);
220 } catch (IOException e) {
221 logger.warn("cannot decode external event", e);
227 * Context used for a single test case.
229 private static class Context {
232 * Hosts that have been added to this context.
234 private final Deque<Host> hosts = new LinkedList<>();
237 * Maps a drools controller to its policy controller.
239 private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
242 * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
244 private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
247 * Queue for the external "DMaaP" topic.
249 private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
252 * Counts the number of decode errors.
254 private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
257 * Number of events we're still waiting to receive.
259 private final CountDownLatch eventCounter;
262 * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
263 * {@link #getCurrentHost()}.
265 private Host currentHost = null;
270 * @param nEvents number of events to be processed
272 public Context(int events) {
273 eventCounter = new CountDownLatch(events);
277 * Destroys the context, stopping any hosts that remain.
279 public void destroy() {
285 * Creates and adds a new host to the context.
287 * @return the new Host
289 public Host addHost() {
290 Host host = new Host(this);
299 public void startHosts() {
300 hosts.forEach(host -> host.start());
306 public void stopHosts() {
307 hosts.forEach(host -> host.stop());
311 * Verifies that all hosts processed at least one message.
313 public void checkAllSawAMsg() {
315 for (Host host : hosts) {
316 assertTrue("msgs=" + msgs, host.messageSeen());
322 * Sets {@link #currentHost} to the specified host, and then invokes the given
323 * function. Resets {@link #currentHost} to {@code null} before returning.
326 * @param func function to invoke
328 public void withHost(Host host, VoidFunction func) {
335 * Offers an event to the external topic.
339 public void offerExternal(String event) {
340 externalTopic.offer(event);
344 * Adds an internal channel to the set of channels.
346 * @param channel channel
347 * @param queue the channel's queue
349 public void addInternal(String channel, BlockingQueue<String> queue) {
350 channel2queue.put(channel, queue);
354 * Offers a message to all internal channels.
356 * @param message message
358 public void offerInternal(String message) {
359 channel2queue.values().forEach(queue -> queue.offer(message));
363 * Offers amessage to an internal channel.
365 * @param channel channel
366 * @param message message
368 public void offerInternal(String channel, String message) {
369 BlockingQueue<String> queue = channel2queue.get(channel);
371 queue.offer(message);
376 * Associates a controller with its drools controller.
378 * @param controller controller
379 * @param droolsController drools controller
381 public void addController(PolicyController controller, DroolsController droolsController) {
382 drools2policy.put(droolsController, controller);
388 * @param droolsController drools controller
389 * @return the controller associated with a drools controller, or {@code null} if
390 * it has no associated controller
392 public PolicyController getController(DroolsController droolsController) {
393 return drools2policy.get(droolsController);
399 * @return queue for the external topic
401 public BlockingQueue<String> getExternalTopic() {
402 return externalTopic;
408 * @return the number of decode errors so far
410 public int getDecodeErrors() {
411 return numDecodeErrors.get();
415 * Increments the count of decode errors.
417 public void bumpDecodeErrors() {
418 numDecodeErrors.incrementAndGet();
422 * Get remaining events.
424 * @return the number of events that haven't been processed
426 public long getRemainingEvents() {
427 return eventCounter.getCount();
431 * Adds an event to the counter.
433 public void addEvent() {
434 eventCounter.countDown();
438 * Waits, for a period of time, for all events to be processed.
442 * @return {@code true} if all events have been processed, {@code false} otherwise
443 * @throws InterruptedException throws interrupted
445 public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
446 return eventCounter.await(time, units);
450 * Gets the current host, provided this is used from within a call to
451 * {@link #withHost(Host, VoidFunction)}.
453 * @return the current host, or {@code null} if there is no current host
455 public Host getCurrentHost() {
461 * Simulates a single "host".
463 private static class Host {
465 private final Context context;
467 private final PoolingFeature feature;
470 * {@code True} if this host has processed a message, {@code false} otherwise.
472 private final AtomicBoolean sawMsg = new AtomicBoolean(false);
475 * This host's internal "DMaaP" topic.
477 private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
480 * Source that reads from the external topic and posts to the listener.
482 private TopicSource externalSource;
485 private final PolicyEngine engine = mock(PolicyEngine.class);
486 private final ListenerController controller = mock(ListenerController.class);
487 private final DroolsController drools = mock(DroolsController.class);
492 * @param context context
494 public Host(Context context) {
495 this.context = context;
497 when(controller.getName()).thenReturn(CONTROLLER1);
498 when(controller.getDrools()).thenReturn(drools);
500 // stop consuming events if the controller stops
501 when(controller.stop()).thenAnswer(args -> {
502 externalSource.unregister(controller);
506 doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
508 context.addController(controller, drools);
510 // arrange to read from the external topic
511 externalSource = new TopicSourceImpl(context, false);
513 feature = new PoolingFeatureImpl(context);
519 * @return the host name
521 public String getName() {
522 return feature.getHost();
526 * Starts threads for the host so that it begins consuming from both the external
527 * "DMaaP" topic and its own internal "DMaaP" topic.
529 public void start() {
531 context.withHost(this, () -> {
533 feature.beforeStart(engine);
534 feature.afterCreate(controller);
536 // assign the queue for this host's internal topic
537 context.addInternal(getName(), msgQueue);
539 feature.beforeStart(controller);
541 // start consuming events from the external topic
542 externalSource.register(controller);
544 feature.afterStart(controller);
549 * Stops the host's threads.
552 feature.beforeStop(controller);
553 externalSource.unregister(controller);
554 feature.afterStop(controller);
558 * Offers an event to the feature, before the policy controller handles it.
560 * @param protocol protocol
561 * @param topic2 topic
563 * @return {@code true} if the event was handled, {@code false} otherwise
565 public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
566 return feature.beforeOffer(controller, protocol, topic2, event);
570 * Offers an event to the feature, after the policy controller handles it.
572 * @param protocol protocol
575 * @param success success
576 * @return {@code true} if the event was handled, {@code false} otherwise
578 public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
580 return feature.afterOffer(controller, protocol, topic, event, success);
584 * Offers an event to the feature, before the drools controller handles it.
587 * @return {@code true} if the event was handled, {@code false} otherwise
589 public boolean beforeInsert(Object fact) {
590 return feature.beforeInsert(drools, fact);
594 * Offers an event to the feature, after the drools controller handles it.
597 * @param successInsert {@code true} if it was successfully inserted by the drools
598 * controller, {@code false} otherwise
599 * @return {@code true} if the event was handled, {@code false} otherwise
601 public boolean afterInsert(Object fact, boolean successInsert) {
602 return feature.afterInsert(drools, fact, successInsert);
606 * Indicates that a message was seen for this host.
608 public void sawMessage() {
615 * @return {@code true} if a message was seen for this host, {@code false}
618 public boolean messageSeen() {
623 * Get internal queue.
625 * @return the queue associated with this host's internal topic
627 public BlockingQueue<String> getInternalQueue() {
633 * Listener for the external topic. Simulates the actions taken by
634 * <i>AggregatedPolicyController.onTopicEvent</i>.
636 private static class MyExternalTopicListener implements Answer<Void> {
638 private final Context context;
639 private final Host host;
641 public MyExternalTopicListener(Context context, Host host) {
642 this.context = context;
647 public Void answer(InvocationOnMock args) throws Throwable {
649 CommInfrastructure commType = args.getArgument(index++);
650 String topic = args.getArgument(index++);
651 String event = args.getArgument(index++);
653 if (host.beforeOffer(commType, topic, event)) {
658 Object fact = decodeEvent(event);
662 context.bumpDecodeErrors();
667 if (!host.beforeInsert(fact)) {
668 // feature did not handle it so we handle it here
669 host.afterInsert(fact, result);
676 host.afterOffer(commType, topic, event, result);
682 * Sink implementation that puts a message on the queue specified by the
683 * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
684 * message is placed on all queues.
686 private static class TopicSinkImpl extends TopicImpl implements TopicSink {
688 private final Context context;
691 * Used to decode the messages so that the channel can be extracted.
693 private final Serializer serializer = new Serializer();
698 * @param context context
700 public TopicSinkImpl(Context context) {
701 this.context = context;
705 public synchronized boolean send(String message) {
711 Message msg = serializer.decodeMsg(message);
712 String channel = msg.getChannel();
714 if (Message.ADMIN.equals(channel)) {
715 // add to every queue
716 context.offerInternal(message);
719 // add to a specific queue
720 context.offerInternal(channel, message);
725 } catch (IOException e) {
726 logger.warn("could not decode message: {}", message);
727 context.bumpDecodeErrors();
734 * Source implementation that reads from a queue associated with a topic.
736 private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
738 private final String topic;
741 * Queue from which to retrieve messages.
743 private final BlockingQueue<String> queue;
746 * Manages the current consumer thread. The "first" item is used as a trigger to
747 * tell the thread to stop processing, while the "second" item is triggered <i>by
748 * the thread</i> when it completes.
750 private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
755 * @param context context
756 * @param internal {@code true} if to read from the internal topic, {@code false}
757 * to read from the external topic
759 public TopicSourceImpl(Context context, boolean internal) {
761 this.topic = INTERNAL_TOPIC;
762 this.queue = context.getCurrentHost().getInternalQueue();
765 this.topic = EXTERNAL_TOPIC;
766 this.queue = context.getExternalTopic();
771 public void setFilter(String filter) {
772 logger.info("topic filter set to: {}", filter);
776 public String getTopic() {
781 public boolean offer(String event) {
782 throw new UnsupportedOperationException("offer topic source");
786 * Starts a thread that takes messages from the queue and gives them to the
787 * listener. Stops the thread of any previously registered listener.
790 public void register(TopicListener listener) {
791 Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1));
795 Thread thread = new Thread(() -> {
799 processMessages(newPair.first(), listener);
801 while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
803 logger.info("topic source thread completed");
805 } catch (InterruptedException e) {
806 logger.warn("topic source thread aborted", e);
807 Thread.currentThread().interrupt();
809 } catch (RuntimeException e) {
810 logger.warn("topic source thread aborted", e);
813 newPair.second().countDown();
817 thread.setDaemon(true);
822 * Stops the thread of <i>any</i> currently registered listener.
825 public void unregister(TopicListener listener) {
830 * Registers a new "pair" with this source, stopping the consumer associated with
831 * any previous registration.
833 * @param newPair the new "pair", or {@code null} to unregister
835 private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
837 Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
838 if (oldPair == null) {
839 if (newPair == null) {
840 // unregister was invoked twice in a row
841 logger.warn("re-unregister for topic source");
844 // no previous thread to stop
848 // need to stop the previous thread
851 oldPair.first().countDown();
853 // wait for it to stop
854 if (!oldPair.second().await(2, TimeUnit.SECONDS)) {
855 logger.warn("old topic registration is still running");
858 } catch (InterruptedException e) {
859 logger.warn("old topic registration may still be running", e);
860 Thread.currentThread().interrupt();
863 if (newPair != null) {
864 // register was invoked twice in a row
865 logger.warn("re-register for topic source");
870 * Polls for messages from the topic and offers them to the listener.
872 * @param stopped triggered if processing should stop
873 * @param listener listener
874 * @throws InterruptedException throws interrupted exception
876 private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
878 for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
880 String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
885 listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
891 * Topic implementation. Most methods just throw
892 * {@link UnsupportedOperationException}.
894 private static class TopicImpl implements Topic {
904 public String getTopic() {
905 return INTERNAL_TOPIC;
909 public CommInfrastructure getTopicCommInfrastructure() {
910 throw new UnsupportedOperationException("topic protocol");
914 public List<String> getServers() {
915 throw new UnsupportedOperationException("topic servers");
919 public String[] getRecentEvents() {
920 throw new UnsupportedOperationException("topic events");
924 public void register(TopicListener topicListener) {
925 throw new UnsupportedOperationException("register topic");
929 public void unregister(TopicListener topicListener) {
930 throw new UnsupportedOperationException("unregister topic");
934 public synchronized boolean start() {
939 public synchronized boolean stop() {
944 public synchronized void shutdown() {
949 public synchronized boolean isAlive() {
954 public boolean lock() {
955 throw new UnsupportedOperationException("lock topicink");
959 public boolean unlock() {
960 throw new UnsupportedOperationException("unlock topic");
964 public boolean isLocked() {
965 throw new UnsupportedOperationException("topic isLocked");
970 * Feature with overrides.
972 private static class PoolingFeatureImpl extends PoolingFeature {
974 private final Context context;
979 * @param context context
981 public PoolingFeatureImpl(Context context) {
982 this.context = context;
985 * Note: do NOT extract anything from "context" at this point, because it
986 * hasn't been fully initialized yet
991 public Properties getProperties(String featName) {
992 Properties props = new Properties();
994 props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
996 props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
997 props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
998 props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
999 props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
1000 props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
1001 "" + stdOfflinePubWaitMs);
1002 props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
1003 "" + stdStartHeartbeatMs);
1004 props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
1005 props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
1006 props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
1007 "" + stdActiveHeartbeatMs);
1008 props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
1009 "" + stdInterHeartbeatMs);
1015 public PolicyController getController(DroolsController droolsController) {
1016 return context.getController(droolsController);
1020 * Embeds a specializer within a property name, after the prefix.
1022 * @param propnm property name into which it should be embedded
1023 * @param spec specializer to be embedded
1024 * @return the property name, with the specializer embedded within it
1026 private String specialize(String propnm, String spec) {
1027 String suffix = propnm.substring(PREFIX.length());
1028 return PREFIX + spec + "." + suffix;
1032 protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
1033 CountDownLatch activeLatch) {
1035 currentContext.set(context);
1037 return new PoolingManagerTest(host, controller, props, activeLatch);
1042 * Pooling Manager with overrides.
1044 private static class PoolingManagerTest extends PoolingManagerImpl {
1049 * @param host the host
1050 * @param controller the controller
1051 * @param props the properties
1052 * @param activeLatch the latch
1054 public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
1055 CountDownLatch activeLatch) {
1057 super(host, controller, props, activeLatch);
1061 protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
1062 return new DmaapManagerImpl(topic);
1066 protected boolean canDecodeEvent(DroolsController drools, String topic) {
1071 protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
1072 return decodeEvent(event);
1077 * DMaaP Manager with overrides.
1079 private static class DmaapManagerImpl extends DmaapManager {
1084 * @param context this manager's context
1085 * @param topic the topic
1086 * @throws PoolingFeatureException if an error occurs
1088 public DmaapManagerImpl(String topic) throws PoolingFeatureException {
1093 protected List<TopicSource> getTopicSources() {
1094 return Arrays.asList(new TopicSourceImpl(currentContext.get(), true));
1098 protected List<TopicSink> getTopicSinks() {
1099 return Arrays.asList(new TopicSinkImpl(currentContext.get()));
1104 * Controller that also implements the {@link TopicListener} interface.
1106 private static interface ListenerController extends PolicyController, TopicListener {
1111 * Simple function that takes no arguments and returns nothing.
1113 @FunctionalInterface
1114 private static interface VoidFunction {
1116 public void apply();