2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2020, 2024 Nordix Foundation
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling;
24 import static org.junit.jupiter.api.Assertions.assertEquals;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.when;
30 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
32 import com.google.gson.Gson;
33 import com.google.gson.JsonParseException;
34 import java.util.Deque;
35 import java.util.IdentityHashMap;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.TreeMap;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.LinkedBlockingQueue;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicInteger;
48 import java.util.concurrent.atomic.AtomicReference;
50 import org.apache.commons.lang3.tuple.Pair;
51 import org.junit.jupiter.api.AfterEach;
52 import org.junit.jupiter.api.BeforeEach;
53 import org.junit.jupiter.api.Test;
54 import org.mockito.invocation.InvocationOnMock;
55 import org.mockito.stubbing.Answer;
56 import org.onap.policy.common.endpoints.event.comm.Topic;
57 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
58 import org.onap.policy.common.endpoints.event.comm.TopicListener;
59 import org.onap.policy.common.endpoints.event.comm.TopicSink;
60 import org.onap.policy.common.endpoints.event.comm.TopicSource;
61 import org.onap.policy.drools.controller.DroolsController;
62 import org.onap.policy.drools.system.PolicyController;
63 import org.onap.policy.drools.system.PolicyEngine;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
68 * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
69 * its own feature object. Uses real feature objects. However, the following are not:
71 * <dt>DMaaP sources and sinks</dt>
72 * <dd>simulated using queues. There is one queue for the external topic, and one queue
73 * for each host's internal topic. Messages published to the "admin" channel are simply
74 * sent to all of the hosts' internal topic queues</dd>
75 * <dt>PolicyEngine, PolicyController, DroolsController</dt>
79 * <p>Invoke {@link #runSlow()}, before the test, to slow things down.
83 private static final Logger logger = LoggerFactory.getLogger(FeatureTest.class);
85 * Name of the topic used for inter-host communication.
87 private static final String INTERNAL_TOPIC = "my.internal.topic";
89 * Name of the topic from which "external" events "arrive".
91 private static final String EXTERNAL_TOPIC = "my.external.topic";
93 * Name of the controller.
95 private static final String CONTROLLER1 = "controller.one";
96 private static long stdReactivateWaitMs = 200;
97 private static long stdIdentificationMs = 60;
98 private static long stdStartHeartbeatMs = 60;
99 private static long stdActiveHeartbeatMs = 50;
100 private static long stdInterHeartbeatMs = 5;
101 private static long stdOfflinePubWaitMs = 2;
102 private static long stdPollMs = 2;
103 private static long stdInterPollMs = 2;
104 private static long stdEventWaitSec = 10;
106 * Used to decode events from the external topic.
108 private static final Gson mapper = new Gson();
110 * Used to identify the current context.
112 private static final ThreadLocal<Context> currentContext = new ThreadLocal<Context>();
114 * Context for the current test case.
123 public void setUp() {
132 public void tearDown() {
139 void test_SingleHost() throws Exception {
144 void test_TwoHosts() throws Exception {
149 void test_ThreeHosts() throws Exception {
153 private void run(int nmessages, int nhosts) throws Exception {
154 ctx = new Context(nmessages);
155 for (int x = 0; x < nhosts; ++x) {
159 for (int x = 0; x < nmessages; ++x) {
160 ctx.offerExternal(makeMessage(x));
162 ctx.awaitEvents(stdEventWaitSec, TimeUnit.SECONDS);
163 assertEquals(0, ctx.getDecodeErrors());
164 assertEquals(0, ctx.getRemainingEvents());
165 ctx.checkAllSawAMsg();
168 private String makeMessage(int reqnum) {
169 return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
173 * Invoke this to slow the timers down.
176 protected static void runSlow() {
177 stdReactivateWaitMs = 10000;
178 stdIdentificationMs = 10000;
179 stdStartHeartbeatMs = 15000;
180 stdActiveHeartbeatMs = 12000;
181 stdInterHeartbeatMs = 5000;
182 stdOfflinePubWaitMs = 2;
184 stdInterPollMs = 2000;
185 stdEventWaitSec = 1000;
192 * @return the decoded event, or {@code null} if it cannot be decoded
195 private static Object decodeEvent(String event) {
197 return mapper.fromJson(event, TreeMap.class);
198 } catch (JsonParseException e) {
199 logger.warn("cannot decode external event", e);
205 * Context used for a single test case.
208 private static class Context {
210 * Hosts that have been added to this context.
212 private final Deque<Host> hosts = new LinkedList<>();
214 * Maps a drools controller to its policy controller.
216 private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
218 * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
220 private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
222 * Counts the number of decode errors.
224 private final AtomicInteger numDecodeErrors = new AtomicInteger(0);
226 * Number of events we're still waiting to receive.
228 private final CountDownLatch eventCounter;
231 * The current host. Set by {@link #withHost(Host, VoidFunction)} and used by
232 * {@link #getCurrentHost()}.
235 private Host currentHost = null;
240 * @param events number of events to be processed
243 public Context(int events) {
244 eventCounter = new CountDownLatch(events);
248 * Destroys the context, stopping any hosts that remain.
251 public void destroy() {
257 * Creates and adds a new host to the context.
259 * @return the new Host
262 public Host addHost() {
263 Host host = new Host(this);
272 public void startHosts() {
273 hosts.forEach(host -> host.start());
280 public void stopHosts() {
281 hosts.forEach(host -> host.stop());
285 * Verifies that all hosts processed at least one message.
288 public void checkAllSawAMsg() {
290 for (Host host : hosts) {
291 assertTrue(host.messageSeen(), "msgs=" + msgs);
297 * Sets {@link #currentHost} to the specified host, and then invokes the given
298 * function. Resets {@link #currentHost} to {@code null} before returning.
301 * @param func function to invoke
304 public void withHost(Host host, VoidFunction func) {
311 * Offers an event to the external topic. As each host needs a copy, it is posted
312 * to each Host's queue.
317 public void offerExternal(String event) {
318 for (Host host : hosts) {
319 host.getExternalTopic().offer(event);
324 * Adds an internal channel to the set of channels.
326 * @param channel channel
327 * @param queue the channel's queue
330 public void addInternal(String channel, BlockingQueue<String> queue) {
331 channel2queue.put(channel, queue);
335 * Offers a message to all internal channels.
337 * @param message message
340 public void offerInternal(String message) {
341 channel2queue.values().forEach(queue -> queue.offer(message));
345 * Associates a controller with its drools controller.
347 * @param controller controller
348 * @param droolsController drools controller
351 public void addController(PolicyController controller, DroolsController droolsController) {
352 drools2policy.put(droolsController, controller);
358 * @param droolsController drools controller
359 * @return the controller associated with a drools controller, or {@code null} if
360 * it has no associated controller
363 public PolicyController getController(DroolsController droolsController) {
364 return drools2policy.get(droolsController);
370 * @return the number of decode errors so far
373 public int getDecodeErrors() {
374 return numDecodeErrors.get();
378 * Increments the count of decode errors.
381 public void bumpDecodeErrors() {
382 numDecodeErrors.incrementAndGet();
386 * Get remaining events.
388 * @return the number of events that haven't been processed
391 public long getRemainingEvents() {
392 return eventCounter.getCount();
396 * Adds an event to the counter.
399 public void addEvent() {
400 eventCounter.countDown();
404 * Waits, for a period of time, for all events to be processed.
408 * @return {@code true} if all events have been processed, {@code false} otherwise
409 * @throws InterruptedException throws interrupted
412 public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
413 return eventCounter.await(time, units);
419 * Simulates a single "host".
422 private static class Host {
423 private final Context context;
424 private final PoolingFeature feature;
427 * {@code True} if this host has processed a message, {@code false} otherwise.
430 private final AtomicBoolean sawMsg = new AtomicBoolean(false);
433 * This host's internal "DMaaP" topic.
436 private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
439 * Queue for the external "DMaaP" topic.
442 private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
445 * Source that reads from the external topic and posts to the listener.
448 private TopicSource externalSource;
451 private final PolicyEngine engine = mock(PolicyEngine.class);
452 private final ListenerController controller = mock(ListenerController.class);
453 private final DroolsController drools = mock(DroolsController.class);
458 * @param context context
461 public Host(Context context) {
462 this.context = context;
463 when(controller.getName()).thenReturn(CONTROLLER1);
464 when(controller.getDrools()).thenReturn(drools);
465 // stop consuming events if the controller stops
466 when(controller.stop()).thenAnswer(args -> {
467 externalSource.unregister(controller);
470 doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
471 context.addController(controller, drools);
472 // arrange to read from the external topic
473 externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
474 feature = new PoolingFeatureImpl(context);
480 * @return the host name
483 public String getName() {
484 return feature.getHost();
488 * Starts threads for the host so that it begins consuming from both the external
489 * "DMaaP" topic and its own internal "DMaaP" topic.
492 public void start() {
493 context.withHost(this, () -> {
494 feature.beforeStart(engine);
495 feature.afterCreate(controller);
496 // assign the queue for this host's internal topic
497 context.addInternal(getName(), msgQueue);
498 feature.beforeStart(controller);
499 // start consuming events from the external topic
500 externalSource.register(controller);
501 feature.afterStart(controller);
506 * Stops the host's threads.
510 feature.beforeStop(controller);
511 externalSource.unregister(controller);
512 feature.afterStop(controller);
516 * Offers an event to the feature, before the policy controller handles it.
518 * @param protocol protocol
519 * @param topic2 topic
521 * @return {@code true} if the event was handled, {@code false} otherwise
524 public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
525 return feature.beforeOffer(controller, protocol, topic2, event);
529 * Offers an event to the feature, after the policy controller handles it.
531 * @param protocol protocol
534 * @param success success
535 * @return {@code true} if the event was handled, {@code false} otherwise
538 public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
539 return feature.afterOffer(controller, protocol, topic, event, success);
543 * Offers an event to the feature, before the drools controller handles it.
546 * @return {@code true} if the event was handled, {@code false} otherwise
549 public boolean beforeInsert(Object fact) {
550 return feature.beforeInsert(drools, fact);
554 * Offers an event to the feature, after the drools controller handles it.
557 * @param successInsert {@code true} if it was successfully inserted by the drools
558 * controller, {@code false} otherwise
559 * @return {@code true} if the event was handled, {@code false} otherwise
562 public boolean afterInsert(Object fact, boolean successInsert) {
563 return feature.afterInsert(drools, fact, successInsert);
567 * Indicates that a message was seen for this host.
570 public void sawMessage() {
577 * @return {@code true} if a message was seen for this host, {@code false} otherwise
580 public boolean messageSeen() {
585 * Get internal queue.
587 * @return the queue associated with this host's internal topic
590 public BlockingQueue<String> getInternalQueue() {
596 * Listener for the external topic. Simulates the actions taken by
597 * <i>AggregatedPolicyController.onTopicEvent</i>.
600 private static class MyExternalTopicListener implements Answer<Void> {
601 private final Context context;
602 private final Host host;
604 public MyExternalTopicListener(Context context, Host host) {
605 this.context = context;
610 public Void answer(InvocationOnMock args) throws Throwable {
612 CommInfrastructure commType = args.getArgument(index++);
613 String topic = args.getArgument(index++);
614 String event = args.getArgument(index++);
615 if (host.beforeOffer(commType, topic, event)) {
619 Object fact = decodeEvent(event);
622 context.bumpDecodeErrors();
625 if (!host.beforeInsert(fact)) {
626 // feature did not handle it so we handle it here
627 host.afterInsert(fact, result);
632 host.afterOffer(commType, topic, event, result);
638 * Sink implementation that puts a message on the queue specified by the
639 * <i>channel</i> embedded within the message. If it's the "admin" channel, then the
640 * message is placed on all queues.
643 private static class TopicSinkImpl extends TopicImpl implements TopicSink {
644 private final Context context;
649 * @param context context
652 public TopicSinkImpl(Context context) {
653 this.context = context;
657 public synchronized boolean send(String message) {
662 context.offerInternal(message);
664 } catch (JsonParseException e) {
665 logger.warn("could not decode message: {}", message);
666 context.bumpDecodeErrors();
673 * Source implementation that reads from a queue associated with a topic.
676 private static class TopicSourceImpl extends TopicImpl implements TopicSource {
678 private final String topic;
680 * Queue from which to retrieve messages.
682 private final BlockingQueue<String> queue;
684 * Manages the current consumer thread. The "first" item is used as a trigger to
685 * tell the thread to stop processing, while the "second" item is triggered <i>by
686 * the thread</i> when it completes.
688 private AtomicReference<Pair<CountDownLatch, CountDownLatch>> pair = new AtomicReference<>(null);
693 * @param type topic type
694 * @param queue topic from which to read
697 public TopicSourceImpl(String type, BlockingQueue<String> queue) {
703 public String getTopic() {
708 public boolean offer(String event) {
709 throw new UnsupportedOperationException("offer topic source");
713 * Starts a thread that takes messages from the queue and gives them to the
714 * listener. Stops the thread of any previously registered listener.
718 public void register(TopicListener listener) {
719 Pair<CountDownLatch, CountDownLatch> newPair = Pair.of(new CountDownLatch(1), new CountDownLatch(1));
721 Thread thread = new Thread(() -> {
724 processMessages(newPair.getLeft(), listener);
725 } while (!newPair.getLeft().await(stdInterPollMs, TimeUnit.MILLISECONDS));
726 logger.info("topic source thread completed");
727 } catch (InterruptedException e) {
728 logger.warn("topic source thread aborted", e);
729 Thread.currentThread().interrupt();
730 } catch (RuntimeException e) {
731 logger.warn("topic source thread aborted", e);
733 newPair.getRight().countDown();
735 thread.setDaemon(true);
740 * Stops the thread of <i>any</i> currently registered listener.
744 public void unregister(TopicListener listener) {
749 * Registers a new "pair" with this source, stopping the consumer associated with
750 * any previous registration.
752 * @param newPair the new "pair", or {@code null} to unregister
755 private void reregister(Pair<CountDownLatch, CountDownLatch> newPair) {
757 Pair<CountDownLatch, CountDownLatch> oldPair = pair.getAndSet(newPair);
758 if (oldPair == null) {
759 if (newPair == null) {
760 // unregister was invoked twice in a row
761 logger.warn("re-unregister for topic source");
763 // no previous thread to stop
766 // need to stop the previous thread
768 oldPair.getLeft().countDown();
769 // wait for it to stop
770 if (!oldPair.getRight().await(2, TimeUnit.SECONDS)) {
771 logger.warn("old topic registration is still running");
773 } catch (InterruptedException e) {
774 logger.warn("old topic registration may still be running", e);
775 Thread.currentThread().interrupt();
777 if (newPair != null) {
778 // register was invoked twice in a row
779 logger.warn("re-register for topic source");
784 * Polls for messages from the topic and offers them to the listener.
786 * @param stopped triggered if processing should stop
787 * @param listener listener
788 * @throws InterruptedException throws interrupted exception
791 private void processMessages(CountDownLatch stopped, TopicListener listener) throws InterruptedException {
792 for (int x = 0; x < 5 && stopped.getCount() > 0; ++x) {
793 String msg = queue.poll(stdPollMs, TimeUnit.MILLISECONDS);
797 listener.onTopicEvent(CommInfrastructure.UEB, topic, msg);
803 * Topic implementation. Most methods just throw
804 * {@link UnsupportedOperationException}.
807 private static class TopicImpl implements Topic {
818 public String getTopic() {
819 return INTERNAL_TOPIC;
823 public String getEffectiveTopic() {
824 return INTERNAL_TOPIC;
828 public CommInfrastructure getTopicCommInfrastructure() {
829 throw new UnsupportedOperationException("topic protocol");
833 public List<String> getServers() {
834 throw new UnsupportedOperationException("topic servers");
838 public String[] getRecentEvents() {
839 throw new UnsupportedOperationException("topic events");
843 public void register(TopicListener topicListener) {
844 throw new UnsupportedOperationException("register topic");
848 public void unregister(TopicListener topicListener) {
849 throw new UnsupportedOperationException("unregister topic");
853 public synchronized boolean start() {
858 public synchronized boolean stop() {
863 public synchronized void shutdown() {
868 public synchronized boolean isAlive() {
873 public boolean lock() {
874 throw new UnsupportedOperationException("lock topicink");
878 public boolean unlock() {
879 throw new UnsupportedOperationException("unlock topic");
883 public boolean isLocked() {
884 throw new UnsupportedOperationException("topic isLocked");
889 * Feature with overrides.
892 private static class PoolingFeatureImpl extends PoolingFeature {
893 private final Context context;
898 * @param context context
901 public PoolingFeatureImpl(Context context) {
902 this.context = context;
904 * Note: do NOT extract anything from "context" at this point, because it
905 * hasn't been fully initialized yet
910 public Properties getProperties(String featName) {
911 Properties props = new Properties();
912 props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
913 props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
914 props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
915 props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
916 props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
917 props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1), "" + stdOfflinePubWaitMs);
918 props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1), "" + stdStartHeartbeatMs);
919 props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + stdReactivateWaitMs);
920 props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + stdIdentificationMs);
921 props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
922 "" + stdActiveHeartbeatMs);
923 props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1), "" + stdInterHeartbeatMs);
928 public PolicyController getController(DroolsController droolsController) {
929 return context.getController(droolsController);
933 * Embeds a specializer within a property name, after the prefix.
935 * @param propnm property name into which it should be embedded
936 * @param spec specializer to be embedded
937 * @return the property name, with the specializer embedded within it
940 private String specialize(String propnm, String spec) {
941 String suffix = propnm.substring(PREFIX.length());
942 return PREFIX + spec + "." + suffix;
946 protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
947 CountDownLatch activeLatch) {
948 currentContext.set(context);
949 return new PoolingManagerTest(host, controller, props, activeLatch);
954 * Pooling Manager with overrides.
957 private static class PoolingManagerTest extends PoolingManagerImpl {
962 * @param host the host
963 * @param controller the controller
964 * @param props the properties
965 * @param activeLatch the latch
968 public PoolingManagerTest(String host, PolicyController controller, PoolingProperties props,
969 CountDownLatch activeLatch) {
970 super(host, controller, props, activeLatch);
974 protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
975 return new TopicMessageManagerImpl(topic);
979 protected boolean canDecodeEvent(DroolsController drools, String topic) {
984 protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
985 return decodeEvent(event);
990 * DMaaP Manager with overrides.
993 private static class TopicMessageManagerImpl extends TopicMessageManager {
998 * @param topic the topic
999 * @throws PoolingFeatureException if an error occurs
1002 public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
1007 protected List<TopicSource> getTopicSources() {
1009 new TopicSourceImpl(INTERNAL_TOPIC, currentContext.get().getCurrentHost().getInternalQueue()));
1013 protected List<TopicSink> getTopicSinks() {
1014 return List.of(new TopicSinkImpl(currentContext.get()));
1019 * Controller that also implements the {@link TopicListener} interface.
1022 private static interface ListenerController extends PolicyController, TopicListener {
1026 * Simple function that takes no arguments and returns nothing.
1029 @FunctionalInterface
1030 private static interface VoidFunction {