2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
30 import java.io.IOException;
31 import java.util.Arrays;
32 import java.util.Deque;
33 import java.util.IdentityHashMap;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Properties;
37 import java.util.TreeMap;
38 import java.util.concurrent.BlockingQueue;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentMap;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicInteger;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.After;
48 import org.junit.AfterClass;
49 import org.junit.Before;
50 import org.junit.BeforeClass;
51 import org.junit.Test;
52 import org.mockito.invocation.InvocationOnMock;
53 import org.mockito.stubbing.Answer;
54 import org.onap.policy.drools.controller.DroolsController;
55 import org.onap.policy.drools.event.comm.FilterableTopicSource;
56 import org.onap.policy.drools.event.comm.Topic;
57 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
58 import org.onap.policy.drools.event.comm.TopicListener;
59 import org.onap.policy.drools.event.comm.TopicSink;
60 import org.onap.policy.drools.event.comm.TopicSource;
61 import org.onap.policy.drools.pooling.message.Message;
62 import org.onap.policy.drools.system.PolicyController;
63 import org.onap.policy.drools.system.PolicyEngine;
64 import org.onap.policy.drools.utils.Pair;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67 import com.fasterxml.jackson.core.type.TypeReference;
68 import com.fasterxml.jackson.databind.ObjectMapper;
71 * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
72 * its own feature object. Uses real feature objects. However, the following are not:
74 * <dt>DMaaP sources and sinks</dt>
75 * <dd>simulated using queues. There is one queue for the external topic, and one queue
76 * for each host's internal topic. Messages published to the "admin" channel are simply
77 * sent to all of the hosts' internal topic queues</dd>
78 * <dt>PolicyEngine, PolicyController, DroolsController</dt>
82 public class FeatureTest {
84 private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
87 * Name of the topic used for inter-host communication.
89 private static final String INTERNAL_TOPIC = "my.internal.topic";
92 * Name of the topic from which "external" events "arrive".
94 private static final String EXTERNAL_TOPIC = "my.external.topic";
97 * Name of the controller.
99 private static final String CONTROLLER1 = "controller.one";
101 private static long stdReactivateWaitMs = 200;
102 private static long stdIdentificationMs = 60;
103 private static long stdStartHeartbeatMs = 60;
104 private static long stdActiveHeartbeatMs = 50;
105 private static long stdInterHeartbeatMs = 5;
106 private static long stdOfflinePubWaitMs = 2;
107 private static long stdPollMs = 2;
108 private static long stdInterPollMs = 2;
109 private static long stdEventWaitSec = 10;
111 // these are saved and restored on exit from this test class
112 private static PoolingFeature.Factory saveFeatureFactory;
113 private static PoolingManagerImpl.Factory saveManagerFactory;
114 private static DmaapManager.Factory saveDmaapFactory;
117 * Context for the current test case.
122 public static void setUpBeforeClass() {
123 saveFeatureFactory = PoolingFeature.getFactory();
124 saveManagerFactory = PoolingManagerImpl.getFactory();
125 saveDmaapFactory = DmaapManager.getFactory();
127 // note: invoke runSlow() to slow things down
131 public static void tearDownAfterClass() {
132 PoolingFeature.setFactory(saveFeatureFactory);
133 PoolingManagerImpl.setFactory(saveManagerFactory);
134 DmaapManager.setFactory(saveDmaapFactory);
138 public void setUp() {
143 public void tearDown() {
150 public void test_SingleHost() throws Exception {
155 public void test_TwoHosts() throws Exception {
160 public void test_ThreeHosts() throws Exception {
164 private void run(int nmessages, int nhosts) throws Exception {
165 ctx = new Context(nmessages);
167 for (int x = 0; x < nhosts; ++x) {
173 for (int x = 0; x < nmessages; ++x) {
174 ctx.offerExternal(makeMessage(x));
177 ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
179 assertEquals(0, ctx.getDecodeErrors());
180 assertEquals(0, ctx.getRemainingEvents());
181 ctx.checkAllSawAMsg();
184 private String makeMessage(int reqnum) {
185 return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
189 * Invoke this to slow the timers down.
191 protected static void runSlow() {
192 stdReactivateWaitMs = 10000;
193 stdIdentificationMs = 10000;
194 stdStartHeartbeatMs = 15000;
195 stdActiveHeartbeatMs = 12000;
196 stdInterHeartbeatMs = 5000;
197 stdOfflinePubWaitMs = 2;
199 stdInterPollMs = 2000;
200 stdEventWaitSec = 1000;
204 * Context used for a single test case.
206 private static class Context {
208 private final FeatureFactory featureFactory;
209 private final ManagerFactory managerFactory;
210 private final DmaapFactory dmaapFactory;
213 * Hosts that have been added to this context.
215 private final Deque<Host> hosts = new LinkedList<>();
218 * Maps a drools controller to its policy controller.
220 private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
223 * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
225 private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
228 * Queue for the external "DMaaP" topic.
230 private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
233 * Counts the number of decode errors.
235 private final AtomicInteger nDecodeErrors = new AtomicInteger(0);
238 * Number of events we're still waiting to receive.
240 private final CountDownLatch eventCounter;
243 * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
244 * {@link #getCurrentHost()}.
246 private Host currentHost = null;
250 * @param nEvents number of events to be processed
252 public Context(int nEvents) {
253 featureFactory = new FeatureFactory(this);
254 managerFactory = new ManagerFactory(this);
255 dmaapFactory = new DmaapFactory(this);
256 eventCounter = new CountDownLatch(nEvents);
258 PoolingFeature.setFactory(featureFactory);
259 PoolingManagerImpl.setFactory(managerFactory);
260 DmaapManager.setFactory(dmaapFactory);
264 * Destroys the context, stopping any hosts that remain.
266 public void destroy() {
272 * Creates and adds a new host to the context.
274 * @return the new Host
276 public Host addHost() {
277 Host host = new Host(this);
286 public void startHosts() {
287 hosts.forEach(host -> host.start());
293 public void stopHosts() {
294 hosts.forEach(host -> host.stop());
298 * Verifies that all hosts processed at least one message.
300 public void checkAllSawAMsg() {
302 for (Host host : hosts) {
303 assertTrue("x=" + x, host.messageSeen());
309 * Sets {@link #currentHost} to the specified host, and then invokes the given
310 * function. Resets {@link #currentHost} to {@code null} before returning.
313 * @param func function to invoke
315 public void withHost(Host host, VoidFunction func) {
322 * Offers an event to the external topic.
326 public void offerExternal(String event) {
327 externalTopic.offer(event);
331 * Adds an internal channel to the set of channels.
334 * @param queue the channel's queue
336 public void addInternal(String channel, BlockingQueue<String> queue) {
337 channel2queue.put(channel, queue);
341 * Offers a message to all internal channels.
345 public void offerInternal(String message) {
346 channel2queue.values().forEach(queue -> queue.offer(message));
350 * Offers amessage to an internal channel.
355 public void offerInternal(String channel, String message) {
356 BlockingQueue<String> queue = channel2queue.get(channel);
358 queue.offer(message);
366 * @return the decoded event, or {@code null} if it cannot be decoded
368 public Object decodeEvent(String event) {
369 return managerFactory.decodeEvent(null, null, event);
373 * Associates a controller with its drools controller.
376 * @param droolsController
378 public void addController(PolicyController controller, DroolsController droolsController) {
379 drools2policy.put(droolsController, controller);
383 * @param droolsController
384 * @return the controller associated with a drools controller, or {@code null} if
385 * it has no associated controller
387 public PolicyController getController(DroolsController droolsController) {
388 return drools2policy.get(droolsController);
392 * @return queue for the external topic
394 public BlockingQueue<String> getExternalTopic() {
395 return externalTopic;
400 * @return the number of decode errors so far
402 public int getDecodeErrors() {
403 return nDecodeErrors.get();
407 * Increments the count of decode errors.
409 public void bumpDecodeErrors() {
410 nDecodeErrors.incrementAndGet();
415 * @return the number of events that haven't been processed
417 public long getRemainingEvents() {
418 return eventCounter.getCount();
422 * Adds an event to the counter.
424 public void addEvent() {
425 eventCounter.countDown();
429 * Waits, for a period of time, for all events to be processed.
433 * @return {@code true} if all events have been processed, {@code false} otherwise
434 * @throws InterruptedException
436 public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
437 return eventCounter.await(time, units);
441 * Gets the current host, provided this is used from within a call to
442 * {@link #withHost(Host, VoidFunction)}.
444 * @return the current host, or {@code null} if there is no current host
446 public Host getCurrentHost() {
452 * Simulates a single "host".
454 private static class Host {
456 private final Context context;
458 private final PoolingFeature feature = new PoolingFeature();
461 * {@code True} if this host has processed a message, {@code false} otherwise.
463 private final AtomicBoolean sawMsg = new AtomicBoolean(false);
466 * This host's internal "DMaaP" topic.
468 private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
471 * Source that reads from the external topic and posts to the listener.
473 private TopicSource externalSource;
476 private final PolicyEngine engine = mock(PolicyEngine.class);
477 private final ListenerController controller = mock(ListenerController.class);
478 private final DroolsController drools = mock(DroolsController.class);
484 public Host(Context context) {
485 this.context = context;
487 when(controller.getName()).thenReturn(CONTROLLER1);
488 when(controller.getDrools()).thenReturn(drools);
490 // stop consuming events if the controller stops
491 when(controller.stop()).thenAnswer(args -> {
492 externalSource.unregister(controller);
496 doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
498 context.addController(controller, drools);
500 // arrange to read from the external topic
501 externalSource = new TopicSourceImpl(context, false);
505 * @return the host name
507 public String getName() {
508 return feature.getHost();
512 * Starts threads for the host so that it begins consuming from both the external
513 * "DMaaP" topic and its own internal "DMaaP" topic.
515 public void start() {
517 context.withHost(this, () -> {
519 feature.beforeStart(engine);
520 feature.afterCreate(controller);
522 // assign the queue for this host's internal topic
523 context.addInternal(getName(), msgQueue);
525 feature.beforeStart(controller);
527 // start consuming events from the external topic
528 externalSource.register(controller);
530 feature.afterStart(controller);
535 * Stops the host's threads.
538 feature.beforeStop(controller);
539 externalSource.unregister(controller);
540 feature.afterStop(controller);
544 * Offers an event to the feature, before the policy controller handles it.
549 * @return {@code true} if the event was handled, {@code false} otherwise
551 public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
552 return feature.beforeOffer(controller, protocol, topic2, event);
556 * Offers an event to the feature, after the policy controller handles it.
562 * @return {@code true} if the event was handled, {@code false} otherwise
564 public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
566 return feature.afterOffer(controller, protocol, topic, event, success);
570 * Offers an event to the feature, before the drools controller handles it.
573 * @return {@code true} if the event was handled, {@code false} otherwise
575 public boolean beforeInsert(Object fact) {
576 return feature.beforeInsert(drools, fact);
580 * Offers an event to the feature, after the drools controller handles it.
583 * @param successInsert {@code true} if it was successfully inserted by the drools
584 * controller, {@code false} otherwise
585 * @return {@code true} if the event was handled, {@code false} otherwise
587 public boolean afterInsert(Object fact, boolean successInsert) {
588 return feature.afterInsert(drools, fact, successInsert);
592 * Indicates that a message was seen for this host.
594 public void sawMessage() {
600 * @return {@code true} if a message was seen for this host, {@code false}
603 public boolean messageSeen() {
608 * @return the queue associated with this host's internal topic
610 public BlockingQueue<String> getInternalQueue() {
616 * Listener for the external topic. Simulates the actions taken by
617 * <i>AggregatedPolicyController.onTopicEvent</i>.
619 private static class MyExternalTopicListener implements Answer<Void> {
621 private final Context context;
622 private final Host host;
624 public MyExternalTopicListener(Context context, Host host) {
625 this.context = context;
630 public Void answer(InvocationOnMock args) throws Throwable {
632 CommInfrastructure commType = args.getArgument(i++);
633 String topic = args.getArgument(i++);
634 String event = args.getArgument(i++);
636 if (host.beforeOffer(commType, topic, event)) {
641 Object fact = context.decodeEvent(event);
645 context.bumpDecodeErrors();
650 if (!host.beforeInsert(fact)) {
651 // feature did not handle it so we handle it here
652 host.afterInsert(fact, result);
659 host.afterOffer(commType, topic, event, result);
665 * Sink implementation that puts a message on the queue specified by the
666 * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
667 * message is placed on all queues.
669 private static class TopicSinkImpl extends TopicImpl implements TopicSink {
671 private final Context context;
674 * Used to decode the messages so that the channel can be extracted.
676 private final Serializer serializer = new Serializer();
682 public TopicSinkImpl(Context context) {
683 this.context = context;
687 public synchronized boolean send(String message) {
693 Message msg = serializer.decodeMsg(message);
694 String channel = msg.getChannel();
696 if (Message.ADMIN.equals(channel)) {
697 // add to every queue
698 context.offerInternal(message);
701 // add to a specific queue
702 context.offerInternal(channel, message);
707 } catch (IOException e) {
708 logger.warn("could not decode message: {}", message);
709 context.bumpDecodeErrors();
716 * Source implementation that reads from a queue associated with a topic.
718 private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
720 private final String topic;
723 * Queue from which to retrieve messages.
725 private final BlockingQueue<String> queue;
728 * Manages the current consumer thread. The "first" item is used as a trigger to
729 * tell the thread to stop processing, while the "second" item is triggered <i>by
730 * the thread</i> when it completes.
732 private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
737 * @param internal {@code true} if to read from the internal topic, {@code false}
738 * to read from the external topic
740 public TopicSourceImpl(Context context, boolean internal) {
742 this.topic = INTERNAL_TOPIC;
743 this.queue = context.getCurrentHost().getInternalQueue();
746 this.topic = EXTERNAL_TOPIC;
747 this.queue = context.getExternalTopic();
752 public void setFilter(String filter) {
753 logger.info("topic filter set to: {}", filter);
757 public String getTopic() {
762 public boolean offer(String event) {
763 throw new UnsupportedOperationException("offer topic source");
767 * Starts a thread that takes messages from the queue and gives them to the
768 * listener. Stops the thread of any previously registered listener.
771 public void register(TopicListener listener) {
772 Pair<CountDownLatch, CountDownLatch> newPair = new Pair<>(new CountDownLatch(1), new CountDownLatch(1));
776 Thread thread = new Thread(() -> {
780 processMessages(newPair.first(), listener);
781 } while (!newPair.first().await(stdInterPollMs, TimeUnit.MILLISECONDS));
783 logger.info("topic source thread completed");
785 } catch (InterruptedException e) {
786 logger.warn("topic source thread aborted", e);
787 Thread.currentThread().interrupt();
789 } catch (RuntimeException e) {
790 logger.warn("topic source thread aborted", e);
793 newPair.second().countDown();
797 thread.setDaemon(true);
802 * Stops the thread of <i>any</i> currently registered listener.
805 public void unregister(TopicListener listener) {
810 * Registers a new "pair" with this source, stopping the consumer associated with
811 * any previous registration.
813 * @param newPair the new "pair", or {@code null} to unregister
815 private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
817 Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
818 if (oldPair == null) {
819 if (newPair == null) {
820 // unregister was invoked twice in a row
821 logger.warn("re-unregister for topic source");
824 // no previous thread to stop
828 // need to stop the previous thread
831 oldPair.first().countDown();
833 // wait for it to stop
834 if (!oldPair.second().await(2, TimeUnit.SECONDS)) {
835 logger.warn("old topic registration is still running");
838 } catch (InterruptedException e) {
839 logger.warn("old topic registration may still be running", e);
840 Thread.currentThread().interrupt();
843 if (newPair != null) {
844 // register was invoked twice in a row
845 logger.warn("re-register for topic source");
850 * Polls for messages from the topic and offers them to the listener.
852 * @param stopped triggered if processing should stop
854 * @throws InterruptedException
856 private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
858 for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
860 String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
865 listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
871 * Topic implementation. Most methods just throw
872 * {@link UnsupportedOperationException}.
874 private static class TopicImpl implements Topic {
884 public String getTopic() {
885 return INTERNAL_TOPIC;
889 public CommInfrastructure getTopicCommInfrastructure() {
890 throw new UnsupportedOperationException("topic protocol");
894 public List<String> getServers() {
895 throw new UnsupportedOperationException("topic servers");
899 public String[] getRecentEvents() {
900 throw new UnsupportedOperationException("topic events");
904 public void register(TopicListener topicListener) {
905 throw new UnsupportedOperationException("register topic");
909 public void unregister(TopicListener topicListener) {
910 throw new UnsupportedOperationException("unregister topic");
914 public synchronized boolean start() {
919 public synchronized boolean stop() {
924 public synchronized void shutdown() {
929 public synchronized boolean isAlive() {
934 public boolean lock() {
935 throw new UnsupportedOperationException("lock topicink");
939 public boolean unlock() {
940 throw new UnsupportedOperationException("unlock topic");
944 public boolean isLocked() {
945 throw new UnsupportedOperationException("topic isLocked");
950 * Simulator for the feature-level factory.
952 private static class FeatureFactory extends PoolingFeature.Factory {
954 private final Context context;
960 public FeatureFactory(Context context) {
961 this.context = context;
964 * Note: do NOT extract anything from "context" at this point, because it
965 * hasn't been fully initialized yet
970 public Properties getProperties(String featName) {
971 Properties props = new Properties();
973 props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
975 props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
976 props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
977 props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
978 props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
979 props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
980 "" + stdOfflinePubWaitMs);
981 props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
982 "" + stdStartHeartbeatMs);
983 props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
984 props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
985 props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
986 "" + stdActiveHeartbeatMs);
987 props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
988 "" + stdInterHeartbeatMs);
994 public PolicyController getController(DroolsController droolsController) {
995 return context.getController(droolsController);
999 * Embeds a specializer within a property name, after the prefix.
1001 * @param propnm property name into which it should be embedded
1002 * @param spec specializer to be embedded
1003 * @return the property name, with the specializer embedded within it
1005 private String specialize(String propnm, String spec) {
1006 String suffix = propnm.substring(PREFIX.length());
1007 return PREFIX + spec + "." + suffix;
1012 * Simulator for the pooling manager factory.
1014 private static class ManagerFactory extends PoolingManagerImpl.Factory {
1017 * Used to decode events from the external topic.
1019 private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
1021 protected ObjectMapper initialValue() {
1022 return new ObjectMapper();
1027 * Used to decode events into a Map.
1029 private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {};
1035 public ManagerFactory(Context context) {
1038 * Note: do NOT extract anything from "context" at this point, because it
1039 * hasn't been fully initialized yet
1044 public boolean canDecodeEvent(DroolsController drools, String topic) {
1049 public Object decodeEvent(DroolsController drools, String topic, String event) {
1051 return mapper.get().readValue(event, typeRef);
1053 } catch (IOException e) {
1054 logger.warn("cannot decode external event", e);
1061 * Simulator for the dmaap manager factory.
1063 private static class DmaapFactory extends DmaapManager.Factory {
1065 private final Context context;
1071 public DmaapFactory(Context context) {
1072 this.context = context;
1075 * Note: do NOT extract anything from "context" at this point, because it
1076 * hasn't been fully initialized yet
1081 public List<TopicSource> getTopicSources() {
1082 return Arrays.asList(new TopicSourceImpl(context, true));
1086 public List<TopicSink> getTopicSinks() {
1087 return Arrays.asList(new TopicSinkImpl(context));
1092 * Controller that also implements the {@link TopicListener} interface.
1094 private static interface ListenerController extends PolicyController, TopicListener {
1099 * Simple function that takes no arguments and returns nothing.
1101 @FunctionalInterface
1102 private static interface VoidFunction {
1104 public void apply();