2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2020 Nordix Foundation
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.when;
30 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
32 import com.google.gson.Gson;
33 import com.google.gson.JsonParseException;
34 import java.util.Arrays;
35 import java.util.Deque;
36 import java.util.IdentityHashMap;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Properties;
40 import java.util.TreeMap;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentMap;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.LinkedBlockingQueue;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.concurrent.atomic.AtomicReference;
51 import org.apache.commons.lang3.tuple.Pair;
52 import org.junit.After;
53 import org.junit.Before;
54 import org.junit.Test;
55 import org.mockito.invocation.InvocationOnMock;
56 import org.mockito.stubbing.Answer;
57 import org.onap.policy.common.endpoints.event.comm.Topic;
58 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
59 import org.onap.policy.common.endpoints.event.comm.TopicListener;
60 import org.onap.policy.common.endpoints.event.comm.TopicSink;
61 import org.onap.policy.common.endpoints.event.comm.TopicSource;
62 import org.onap.policy.drools.controller.DroolsController;
63 import org.onap.policy.drools.system.PolicyController;
64 import org.onap.policy.drools.system.PolicyEngine;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
69 * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
70 * its own feature object. Uses real feature objects. However, the following are not:
72 * <dt>DMaaP sources and sinks</dt>
73 * <dd>simulated using queues. There is one queue for the external topic, and one queue
74 * for each host's internal topic. Messages published to the "admin" channel are simply
75 * sent to all of the hosts' internal topic queues</dd>
76 * <dt>PolicyEngine, PolicyController, DroolsController</dt>
80 * <p>Invoke {@link #runSlow()}, before the test, to slow things down.
83 public class FeatureTest {
84 private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
86 * Name of the topic used for inter-host communication.
88 private static final String INTERNAL_TOPIC = "my.internal.topic";
90 * Name of the topic from which "external" events "arrive".
92 private static final String EXTERNAL_TOPIC = "my.external.topic";
94 * Name of the controller.
96 private static final String CONTROLLER1 = "controller.one";
97 private static long stdReactivateWaitMs = 200;
98 private static long stdIdentificationMs = 60;
99 private static long stdStartHeartbeatMs = 60;
100 private static long stdActiveHeartbeatMs = 50;
101 private static long stdInterHeartbeatMs = 5;
102 private static long stdOfflinePubWaitMs = 2;
103 private static long stdPollMs = 2;
104 private static long stdInterPollMs = 2;
105 private static long stdEventWaitSec = 10;
107 * Used to decode events from the external topic.
109 private static final Gson mapper = new Gson();
111 * Used to identify the current context.
113 private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>();
115 * Context for the current test case.
123 public void setUp() {
131 public void tearDown() {
138 public void test_SingleHost() throws Exception {
143 public void test_TwoHosts() throws Exception {
148 public void test_ThreeHosts() throws Exception {
152 private void run(int nmessages, int nhosts) throws Exception {
153 ctx = new Context(nmessages);
154 for (int x = 0; x < nhosts; ++x) {
158 for (int x = 0; x < nmessages; ++x) {
159 ctx.offerExternal(makeMessage(x));
161 ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
162 assertEquals(0, ctx.getDecodeErrors());
163 assertEquals(0, ctx.getRemainingEvents());
164 ctx.checkAllSawAMsg();
167 private String makeMessage(int reqnum) {
168 return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
171 * Invoke this to slow the timers down.
174 protected static void runSlow() {
175 stdReactivateWaitMs = 10000;
176 stdIdentificationMs = 10000;
177 stdStartHeartbeatMs = 15000;
178 stdActiveHeartbeatMs = 12000;
179 stdInterHeartbeatMs = 5000;
180 stdOfflinePubWaitMs = 2;
182 stdInterPollMs = 2000;
183 stdEventWaitSec = 1000;
189 * @return the decoded event, or {@code null} if it cannot be decoded
192 private static Object decodeEvent(String event) {
194 return mapper.fromJson(event, TreeMap.class);
195 } catch (JsonParseException e) {
196 logger.warn("cannot decode external event", e);
201 * Context used for a single test case.
204 private static class Context {
206 * Hosts that have been added to this context.
208 private final Deque<Host> hosts = new LinkedList<>();
210 * Maps a drools controller to its policy controller.
212 private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
214 * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
216 private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
218 * Counts the number of decode errors.
220 private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
222 * Number of events we're still waiting to receive.
224 private final CountDownLatch eventCounter;
226 * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
227 * {@link #getCurrentHost()}.
229 private Host currentHost = null;
234 * @param events number of events to be processed
237 public Context(int events) {
238 eventCounter = new CountDownLatch(events);
241 * Destroys the context, stopping any hosts that remain.
244 public void destroy() {
250 * Creates and adds a new host to the context.
252 * @return the new Host
255 public Host addHost() {
256 Host host = new Host(this);
265 public void startHosts() {
266 hosts.forEach(host -> host.start());
273 public void stopHosts() {
274 hosts.forEach(host -> host.stop());
278 * Verifies that all hosts processed at least one message.
281 public void checkAllSawAMsg() {
283 for (Host host : hosts) {
284 assertTrue("msgs=" + msgs, host.messageSeen());
290 * Sets {@link #currentHost} to the specified host, and then invokes the given
291 * function. Resets {@link #currentHost} to {@code null} before returning.
294 * @param func function to invoke
297 public void withHost(Host host, VoidFunction func) {
304 * Offers an event to the external topic. As each host needs a copy, it is posted
305 * to each Host's queue.
310 public void offerExternal(String event) {
311 for (Host host : hosts) {
312 host.getExternalTopic().offer(event);
317 * Adds an internal channel to the set of channels.
319 * @param channel channel
320 * @param queue the channel's queue
323 public void addInternal(String channel, BlockingQueue<String> queue) {
324 channel2queue.put(channel, queue);
328 * Offers a message to all internal channels.
330 * @param message message
333 public void offerInternal(String message) {
334 channel2queue.values().forEach(queue -> queue.offer(message));
338 * Associates a controller with its drools controller.
340 * @param controller controller
341 * @param droolsController drools controller
344 public void addController(PolicyController controller, DroolsController droolsController) {
345 drools2policy.put(droolsController, controller);
351 * @param droolsController drools controller
352 * @return the controller associated with a drools controller, or {@code null} if
353 * it has no associated controller
356 public PolicyController getController(DroolsController droolsController) {
357 return drools2policy.get(droolsController);
363 * @return the number of decode errors so far
366 public int getDecodeErrors() {
367 return numDecodeErrors.get();
371 * Increments the count of decode errors.
374 public void bumpDecodeErrors() {
375 numDecodeErrors.incrementAndGet();
379 * Get remaining events.
381 * @return the number of events that haven't been processed
384 public long getRemainingEvents() {
385 return eventCounter.getCount();
389 * Adds an event to the counter.
392 public void addEvent() {
393 eventCounter.countDown();
397 * Waits, for a period of time, for all events to be processed.
401 * @return {@code true} if all events have been processed, {@code false} otherwise
402 * @throws InterruptedException throws interrupted
405 public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
406 return eventCounter.await(time, units);
410 * Gets the current host, provided this is used from within a call to
411 * {@link #withHost(Host, VoidFunction)}.
413 * @return the current host, or {@code null} if there is no current host
416 public Host getCurrentHost() {
422 * Simulates a single "host".
425 private static class Host {
426 private final Context context;
427 private final PoolingFeature feature;
430 * {@code True} if this host has processed a message, {@code false} otherwise.
433 private final AtomicBoolean sawMsg = new AtomicBoolean(false);
436 * This host's internal "DMaaP" topic.
439 private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
442 * Queue for the external "DMaaP" topic.
445 private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
448 * Source that reads from the external topic and posts to the listener.
451 private TopicSource externalSource;
454 private final PolicyEngine engine = mock(PolicyEngine.class);
455 private final ListenerController controller = mock(ListenerController.class);
456 private final DroolsController drools = mock(DroolsController.class);
461 * @param context context
464 public Host(Context context) {
465 this.context = context;
466 when(controller.getName()).thenReturn(CONTROLLER1);
467 when(controller.getDrools()).thenReturn(drools);
468 // stop consuming events if the controller stops
469 when(controller.stop()).thenAnswer(args -> {
470 externalSource.unregister(controller);
473 doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
474 context.addController(controller, drools);
475 // arrange to read from the external topic
476 externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
477 feature = new PoolingFeatureImpl(context);
483 * @return the host name
486 public String getName() {
487 return feature.getHost();
491 * Starts threads for the host so that it begins consuming from both the external
492 * "DMaaP" topic and its own internal "DMaaP" topic.
495 public void start() {
496 context.withHost(this, () -> {
497 feature.beforeStart(engine);
498 feature.afterCreate(controller);
499 // assign the queue for this host's internal topic
500 context.addInternal(getName(), msgQueue);
501 feature.beforeStart(controller);
502 // start consuming events from the external topic
503 externalSource.register(controller);
504 feature.afterStart(controller);
509 * Stops the host's threads.
513 feature.beforeStop(controller);
514 externalSource.unregister(controller);
515 feature.afterStop(controller);
519 * Offers an event to the feature, before the policy controller handles it.
521 * @param protocol protocol
522 * @param topic2 topic
524 * @return {@code true} if the event was handled, {@code false} otherwise
527 public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
528 return feature.beforeOffer(controller, protocol, topic2, event);
532 * Offers an event to the feature, after the policy controller handles it.
534 * @param protocol protocol
537 * @param success success
538 * @return {@code true} if the event was handled, {@code false} otherwise
541 public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
542 return feature.afterOffer(controller, protocol, topic, event, success);
546 * Offers an event to the feature, before the drools controller handles it.
549 * @return {@code true} if the event was handled, {@code false} otherwise
552 public boolean beforeInsert(Object fact) {
553 return feature.beforeInsert(drools, fact);
557 * Offers an event to the feature, after the drools controller handles it.
560 * @param successInsert {@code true} if it was successfully inserted by the drools
561 * controller, {@code false} otherwise
562 * @return {@code true} if the event was handled, {@code false} otherwise
565 public boolean afterInsert(Object fact, boolean successInsert) {
566 return feature.afterInsert(drools, fact, successInsert);
570 * Indicates that a message was seen for this host.
573 public void sawMessage() {
580 * @return {@code true} if a message was seen for this host, {@code false}
584 public boolean messageSeen() {
589 * Get internal queue.
591 * @return the queue associated with this host's internal topic
594 public BlockingQueue<String> getInternalQueue() {
600 * Listener for the external topic. Simulates the actions taken by
601 * <i>AggregatedPolicyController.onTopicEvent</i>.
604 private static class MyExternalTopicListener implements Answer<Void> {
605 private final Context context;
606 private final Host host;
608 public MyExternalTopicListener(Context context, Host host) {
609 this.context = context;
614 public Void answer(InvocationOnMock args) throws Throwable {
616 CommInfrastructure commType = args.getArgument(index++);
617 String topic = args.getArgument(index++);
618 String event = args.getArgument(index++);
619 if (host.beforeOffer(commType, topic, event)) {
623 Object fact = decodeEvent(event);
626 context.bumpDecodeErrors();
629 if (!host.beforeInsert(fact)) {
630 // feature did not handle it so we handle it here
631 host.afterInsert(fact, result);
636 host.afterOffer(commType, topic, event, result);
642 * Sink implementation that puts a message on the queue specified by the
643 * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
644 * message is placed on all queues.
647 private static class TopicSinkImpl extends TopicImpl implements TopicSink {
648 private final Context context;
653 * @param context context
656 public TopicSinkImpl(Context context) {
657 this.context = context;
661 public synchronized boolean send(String message) {
666 context.offerInternal(message);
668 } catch (JsonParseException e) {
669 logger.warn("could not decode message: {}", message);
670 context.bumpDecodeErrors();
677 * Source implementation that reads from a queue associated with a topic.
680 private static class TopicSourceImpl extends TopicImpl implements TopicSource {
682 private final String topic;
684 * Queue from which to retrieve messages.
686 private final BlockingQueue<String> queue;
688 * Manages the current consumer thread. The "first" item is used as a trigger to
689 * tell the thread to stop processing, while the "second" item is triggered <i>by
690 * the thread</i> when it completes.
692 private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
697 * @param type topic type
698 * @param queue topic from which to read
701 public TopicSourceImpl(String type, BlockingQueue<String> queue) {
707 public String getTopic() {
712 public boolean offer(String event) {
713 throw new UnsupportedOperationException("offer topic source");
717 * Starts a thread that takes messages from the queue and gives them to the
718 * listener. Stops the thread of any previously registered listener.
722 public void register(TopicListener listener) {
723 Pair<CountDownLatch, CountDownLatch> newPair = Pair.of(new CountDownLatch(1), new CountDownLatch(1));
725 Thread thread = new Thread(() -> {
728 processMessages(newPair.getLeft(), listener);
729 } while (!newPair.getLeft().await(stdInterPollMs, TimeUnit.MILLISECONDS));
730 logger.info("topic source thread completed");
731 } catch (InterruptedException e) {
732 logger.warn("topic source thread aborted", e);
733 Thread.currentThread().interrupt();
734 } catch (RuntimeException e) {
735 logger.warn("topic source thread aborted", e);
737 newPair.getRight().countDown();
739 thread.setDaemon(true);
744 * Stops the thread of <i>any</i> currently registered listener.
748 public void unregister(TopicListener listener) {
753 * Registers a new "pair" with this source, stopping the consumer associated with
754 * any previous registration.
756 * @param newPair the new "pair", or {@code null} to unregister
759 private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
761 Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
762 if (oldPair == null) {
763 if (newPair == null) {
764 // unregister was invoked twice in a row
765 logger.warn("re-unregister for topic source");
767 // no previous thread to stop
770 // need to stop the previous thread
772 oldPair.getLeft().countDown();
773 // wait for it to stop
774 if (!oldPair.getRight().await(2, TimeUnit.SECONDS)) {
775 logger.warn("old topic registration is still running");
777 } catch (InterruptedException e) {
778 logger.warn("old topic registration may still be running", e);
779 Thread.currentThread().interrupt();
781 if (newPair != null) {
782 // register was invoked twice in a row
783 logger.warn("re-register for topic source");
788 * Polls for messages from the topic and offers them to the listener.
790 * @param stopped triggered if processing should stop
791 * @param listener listener
792 * @throws InterruptedException throws interrupted exception
795 private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
796 for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
797 String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
801 listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
807 * Topic implementation. Most methods just throw
808 * {@link UnsupportedOperationException}.
811 private static class TopicImpl implements Topic {
822 public String getTopic() {
823 return INTERNAL_TOPIC;
827 public String getEffectiveTopic() {
828 return INTERNAL_TOPIC;
832 public CommInfrastructure getTopicCommInfrastructure() {
833 throw new UnsupportedOperationException("topic protocol");
837 public List<String> getServers() {
838 throw new UnsupportedOperationException("topic servers");
842 public String[] getRecentEvents() {
843 throw new UnsupportedOperationException("topic events");
847 public void register(TopicListener topicListener) {
848 throw new UnsupportedOperationException("register topic");
852 public void unregister(TopicListener topicListener) {
853 throw new UnsupportedOperationException("unregister topic");
857 public synchronized boolean start() {
862 public synchronized boolean stop() {
867 public synchronized void shutdown() {
872 public synchronized boolean isAlive() {
877 public boolean lock() {
878 throw new UnsupportedOperationException("lock topicink");
882 public boolean unlock() {
883 throw new UnsupportedOperationException("unlock topic");
887 public boolean isLocked() {
888 throw new UnsupportedOperationException("topic isLocked");
893 * Feature with overrides.
896 private static class PoolingFeatureImpl extends PoolingFeature {
897 private final Context context;
902 * @param context context
905 public PoolingFeatureImpl(Context context) {
906 this.context = context;
908 * Note: do NOT extract anything from "context" at this point, because it
909 * hasn't been fully initialized yet
914 public Properties getProperties(String featName) {
915 Properties props = new Properties();
916 props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
917 props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
918 props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
919 props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
920 props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
921 props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
922 "" + stdOfflinePubWaitMs);
923 props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
924 "" + stdStartHeartbeatMs);
925 props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
926 props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
927 props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
928 "" + stdActiveHeartbeatMs);
929 props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
930 "" + stdInterHeartbeatMs);
935 public PolicyController getController(DroolsController droolsController) {
936 return context.getController(droolsController);
940 * Embeds a specializer within a property name, after the prefix.
942 * @param propnm property name into which it should be embedded
943 * @param spec specializer to be embedded
944 * @return the property name, with the specializer embedded within it
947 private String specialize(String propnm, String spec) {
948 String suffix = propnm.substring(PREFIX.length());
949 return PREFIX + spec + "." + suffix;
953 protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
954 CountDownLatch activeLatch) {
955 currentContext.set(context);
956 return new PoolingManagerTest(host, controller, props, activeLatch);
961 * Pooling Manager with overrides.
964 private static class PoolingManagerTest extends PoolingManagerImpl {
969 * @param host the host
970 * @param controller the controller
971 * @param props the properties
972 * @param activeLatch the latch
975 public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
976 CountDownLatch activeLatch) {
977 super(host, controller, props, activeLatch);
981 protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
982 return new DmaapManagerImpl(topic);
986 protected boolean canDecodeEvent(DroolsController drools, String topic) {
991 protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
992 return decodeEvent(event);
997 * DMaaP Manager with overrides.
1000 private static class DmaapManagerImpl extends DmaapManager {
1005 * @param topic the topic
1006 * @throws PoolingFeatureException if an error occurs
1009 public DmaapManagerImpl(String topic) throws PoolingFeatureException {
1014 protected List<TopicSource> getTopicSources() {
1015 return Arrays.asList(new TopicSourceImpl(INTERNAL_TOPIC,
1016 currentContext.get().getCurrentHost().getInternalQueue()));
1020 protected List<TopicSink> getTopicSinks() {
1021 return Arrays.asList(new TopicSinkImpl(currentContext.get()));
1026 * Controller that also implements the {@link TopicListener} interface.
1029 private static interface ListenerController extends PolicyController, TopicListener {
1033 * Simple function that takes no arguments and returns nothing.
1036 @FunctionalInterface
1037 private static interface VoidFunction {