2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling;
24 import static org.junit.jupiter.api.Assertions.assertEquals;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.when;
30 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
32 import com.google.gson.Gson;
33 import com.google.gson.JsonParseException;
34 import java.util.Arrays;
35 import java.util.Deque;
36 import java.util.IdentityHashMap;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Properties;
40 import java.util.TreeMap;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45 import org.junit.jupiter.api.AfterAll;
46 import org.junit.jupiter.api.AfterEach;
47 import org.junit.jupiter.api.BeforeAll;
48 import org.junit.jupiter.api.BeforeEach;
49 import org.junit.jupiter.api.Disabled;
50 import org.junit.jupiter.api.Test;
51 import org.mockito.invocation.InvocationOnMock;
52 import org.mockito.stubbing.Answer;
53 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
54 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
55 import org.onap.policy.common.endpoints.event.comm.TopicListener;
56 import org.onap.policy.common.endpoints.event.comm.TopicSink;
57 import org.onap.policy.common.endpoints.event.comm.TopicSource;
58 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
59 import org.onap.policy.drools.controller.DroolsController;
60 import org.onap.policy.drools.system.PolicyController;
61 import org.onap.policy.drools.system.PolicyEngine;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
66 * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having its own
67 * feature object. Uses real feature objects, as well as real DMaaP sources and sinks. However, the
68 * following are not: <dl> <dt>PolicyEngine, PolicyController, DroolsController</dt> <dd>mocked</dd>
71 * <p>The following fields must be set before executing this: <ul> <li>UEB_SERVERS</li>
72 * <li>INTERNAL_TOPIC</li> <li>EXTERNAL_TOPIC</li> </ul>
74 public class EndToEndFeatureTest {
76 private static final Logger logger = LoggerFactory.getLogger(EndToEndFeatureTest.class);
79 * UEB servers for both internal & external topics.
81 private static final String UEB_SERVERS = "ueb-server";
84 * Name of the topic used for inter-host communication.
86 private static final String INTERNAL_TOPIC = "internal-topic";
89 * Name of the topic from which "external" events "arrive".
91 private static final String EXTERNAL_TOPIC = "external-topic";
94 * Consumer group to use when polling the external topic.
96 private static final String EXTERNAL_GROUP = EndToEndFeatureTest.class.getName();
99 * Name of the controller.
101 private static final String CONTROLLER1 = "controller.one";
104 * Maximum number of items to fetch from DMaaP in a single poll.
106 private static final String FETCH_LIMIT = "5";
108 private static final long STD_REACTIVATE_WAIT_MS = 10000;
109 private static final long STD_IDENTIFICATION_MS = 10000;
110 private static final long STD_START_HEARTBEAT_MS = 15000;
111 private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
112 private static final long STD_INTER_HEARTBEAT_MS = 5000;
113 private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
114 private static final long EVENT_WAIT_SEC = 15;
117 * Used to decode events from the external topic.
119 private static final Gson mapper = new Gson();
122 * Used to identify the current host.
124 private static final ThreadLocal<Host> currentHost = new ThreadLocal<Host>();
127 * Sink for external DMaaP topic.
129 private static TopicSink externalSink;
132 * Sink for internal DMaaP topic.
134 private static TopicSink internalSink;
137 * Context for the current test case.
142 * Setup before class.
146 public static void setUpBeforeClass() {
147 externalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
148 externalSink.start();
150 internalSink = TopicEndpointManager.getManager().addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
151 internalSink.start();
155 * Tear down after class.
159 public static void tearDownAfterClass() {
168 public void setUp() {
176 public void tearDown() {
183 * This test should only be run manually, after configuring all the fields,
184 * thus it is ignored.
188 public void test_SingleHost() throws Exception { // NOSONAR
193 * This test should only be run manually, after configuring all the fields,
194 * thus it is ignored.
198 public void test_TwoHosts() throws Exception { // NOSONAR
203 * This test should only be run manually, after configuring all the fields,
204 * thus it is ignored.
208 public void test_ThreeHosts() throws Exception { // NOSONAR
212 private void run(int nmessages, int nhosts) throws Exception {
213 ctx = new Context(nmessages);
215 for (int x = 0; x < nhosts; ++x) {
220 ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
222 for (int x = 0; x < nmessages; ++x) {
223 ctx.offerExternal(makeMessage(x));
226 ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
228 assertEquals(0, ctx.getDecodeErrors());
229 assertEquals(0, ctx.getRemainingEvents());
230 ctx.checkAllSawAMsg();
233 private String makeMessage(int reqnum) {
234 return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
237 private static Properties makeSinkProperties(String topic) {
238 Properties props = new Properties();
240 props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS, topic);
242 props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
243 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
244 props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
245 + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
246 props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
247 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
252 private static Properties makeSourceProperties(String topic) {
253 Properties props = new Properties();
255 props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
257 props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
258 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
259 props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
260 + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
261 props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
262 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
264 if (EXTERNAL_TOPIC.equals(topic)) {
265 // consumer group is a constant
266 props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
267 + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP);
269 // consumer instance is generated by the BusConsumer code
272 // else internal topic: feature populates info for internal topic
281 * @return the decoded event, or {@code null} if it cannot be decoded
283 private static Object decodeEvent(String event) {
285 return mapper.fromJson(event, TreeMap.class);
287 } catch (JsonParseException e) {
288 logger.warn("cannot decode external event", e);
294 * Context used for a single test case.
296 private static class Context {
299 * Hosts that have been added to this context.
301 private final Deque<Host> hosts = new LinkedList<>();
304 * Maps a drools controller to its policy controller.
306 private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
309 * Counts the number of decode errors.
311 private final AtomicInteger decodeErrors = new AtomicInteger(0);
314 * Number of events we're still waiting to receive.
316 private final CountDownLatch eventCounter;
321 * @param events number of events to be processed
323 public Context(int events) {
324 eventCounter = new CountDownLatch(events);
328 * Destroys the context, stopping any hosts that remain.
330 public void destroy() {
336 * Creates and adds a new host to the context.
338 * @return the new Host
340 public Host addHost() {
341 Host host = new Host(this);
350 public void startHosts() {
351 hosts.forEach(host -> host.start());
357 public void stopHosts() {
358 hosts.forEach(host -> host.stop());
362 * Verifies that all hosts processed at least one message.
364 public void checkAllSawAMsg() {
366 for (Host host : hosts) {
367 assertTrue(host.messageSeen(), "msgs=" + msgs);
373 * Offers an event to the external topic.
377 public void offerExternal(String event) {
378 externalSink.send(event);
382 * Associates a controller with its drools controller.
384 * @param controller controller
385 * @param droolsController drools controller
387 public void addController(PolicyController controller, DroolsController droolsController) {
388 drools2policy.put(droolsController, controller);
394 * @param droolsController drools controller
395 * @return the controller associated with a drools controller, or {@code null} if it has no
396 * associated controller
398 public PolicyController getController(DroolsController droolsController) {
399 return drools2policy.get(droolsController);
405 * @return the number of decode errors so far
407 public int getDecodeErrors() {
408 return decodeErrors.get();
412 * Increments the count of decode errors.
414 public void bumpDecodeErrors() {
415 decodeErrors.incrementAndGet();
419 * Get remaining events.
421 * @return the number of events that haven't been processed
423 public long getRemainingEvents() {
424 return eventCounter.getCount();
428 * Adds an event to the counter.
430 public void addEvent() {
431 eventCounter.countDown();
435 * Waits, for a period of time, for all events to be processed.
439 * @return {@code true} if all events have been processed, {@code false} otherwise
440 * @throws InterruptedException throws interrupted exception
442 public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
443 return eventCounter.await(time, units);
447 * Waits, for a period of time, for all hosts to enter the Active state.
449 * @param timeMs maximum time to wait, in milliseconds
450 * @throws InterruptedException throws interrupted exception
452 public void awaitAllActive(long timeMs) throws InterruptedException {
453 long tend = timeMs + System.currentTimeMillis();
455 for (Host host : hosts) {
456 long tremain = Math.max(0, tend - System.currentTimeMillis());
457 assertTrue(host.awaitActive(tremain));
463 * Simulates a single "host".
465 private static class Host {
467 private final PoolingFeature feature;
470 * {@code True} if this host has processed a message, {@code false} otherwise.
472 private final AtomicBoolean sawMsg = new AtomicBoolean(false);
474 private final TopicSource externalSource;
475 private final TopicSource internalSource;
478 private final PolicyEngine engine = mock(PolicyEngine.class);
479 private final ListenerController controller = mock(ListenerController.class);
480 private final DroolsController drools = mock(DroolsController.class);
485 * @param context context
487 public Host(Context context) {
489 when(controller.getName()).thenReturn(CONTROLLER1);
490 when(controller.getDrools()).thenReturn(drools);
492 externalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(EXTERNAL_TOPIC))
494 internalSource = TopicEndpointManager.getManager().addTopicSources(makeSourceProperties(INTERNAL_TOPIC))
497 // stop consuming events if the controller stops
498 when(controller.stop()).thenAnswer(args -> {
499 externalSource.unregister(controller);
503 doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
505 context.addController(controller, drools);
507 feature = new PoolingFeatureImpl(context, this);
511 * Waits, for a period of time, for the host to enter the Active state.
513 * @param timeMs time to wait, in milliseconds
514 * @return {@code true} if the host entered the Active state within the given amount of
515 * time, {@code false} otherwise
516 * @throws InterruptedException throws interrupted exception
518 public boolean awaitActive(long timeMs) throws InterruptedException {
519 return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
523 * Starts threads for the host so that it begins consuming from both the external "DMaaP"
524 * topic and its own internal "DMaaP" topic.
526 public void start() {
527 feature.beforeStart(engine);
528 feature.afterCreate(controller);
530 feature.beforeStart(controller);
532 // start consuming events from the external topic
533 externalSource.register(controller);
535 feature.afterStart(controller);
539 * Stops the host's threads.
542 feature.beforeStop(controller);
543 externalSource.unregister(controller);
544 feature.afterStop(controller);
548 * Offers an event to the feature, before the policy controller handles it.
550 * @param protocol protocol
551 * @param topic2 topic
553 * @return {@code true} if the event was handled, {@code false} otherwise
555 public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
556 return feature.beforeOffer(controller, protocol, topic2, event);
560 * Offers an event to the feature, after the policy controller handles it.
562 * @param protocol protocol
565 * @param success success
566 * @return {@code true} if the event was handled, {@code false} otherwise
568 public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
570 return feature.afterOffer(controller, protocol, topic, event, success);
574 * Offers an event to the feature, before the drools controller handles it.
577 * @return {@code true} if the event was handled, {@code false} otherwise
579 public boolean beforeInsert(Object fact) {
580 return feature.beforeInsert(drools, fact);
584 * Offers an event to the feature, after the drools controller handles it.
587 * @param successInsert {@code true} if it was successfully inserted by the drools
588 * controller, {@code false} otherwise
589 * @return {@code true} if the event was handled, {@code false} otherwise
591 public boolean afterInsert(Object fact, boolean successInsert) {
592 return feature.afterInsert(drools, fact, successInsert);
596 * Indicates that a message was seen for this host.
598 public void sawMessage() {
605 * @return {@code true} if a message was seen for this host, {@code false} otherwise
607 public boolean messageSeen() {
613 * Listener for the external topic. Simulates the actions taken by
614 * <i>AggregatedPolicyController.onTopicEvent</i>.
616 private static class MyExternalTopicListener implements Answer<Void> {
618 private final Context context;
619 private final Host host;
621 public MyExternalTopicListener(Context context, Host host) {
622 this.context = context;
627 public Void answer(InvocationOnMock args) throws Throwable {
629 CommInfrastructure commType = args.getArgument(index++);
630 String topic = args.getArgument(index++);
631 String event = args.getArgument(index++);
633 if (host.beforeOffer(commType, topic, event)) {
638 Object fact = decodeEvent(event);
642 context.bumpDecodeErrors();
647 if (!host.beforeInsert(fact)) {
648 // feature did not handle it so we handle it here
649 host.afterInsert(fact, result);
656 host.afterOffer(commType, topic, event, result);
662 * Feature with overrides.
664 private static class PoolingFeatureImpl extends PoolingFeature {
666 private final Context context;
667 private final Host host;
672 * @param context context
674 public PoolingFeatureImpl(Context context, Host host) {
675 this.context = context;
679 * Note: do NOT extract anything from "context" at this point, because it hasn't been
680 * fully initialized yet
685 public Properties getProperties(String featName) {
686 Properties props = new Properties();
688 props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
690 props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
691 props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
692 props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
693 props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
694 props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
695 "" + STD_OFFLINE_PUB_WAIT_MS);
696 props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
697 "" + STD_START_HEARTBEAT_MS);
698 props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
699 props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
700 props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
701 "" + STD_ACTIVE_HEARTBEAT_MS);
702 props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
703 "" + STD_INTER_HEARTBEAT_MS);
705 props.putAll(makeSinkProperties(INTERNAL_TOPIC));
706 props.putAll(makeSourceProperties(INTERNAL_TOPIC));
712 public PolicyController getController(DroolsController droolsController) {
713 return context.getController(droolsController);
717 * Embeds a specializer within a property name, after the prefix.
719 * @param propnm property name into which it should be embedded
720 * @param spec specializer to be embedded
721 * @return the property name, with the specializer embedded within it
723 private String specialize(String propnm, String spec) {
724 String suffix = propnm.substring(PREFIX.length());
725 return PREFIX + spec + "." + suffix;
729 protected PoolingManagerImpl makeManager(String hostName, PolicyController controller, PoolingProperties props,
730 CountDownLatch activeLatch) {
733 * Set this before creating the test, because the test's superclass
734 * constructor uses it before the test object has a chance to store it.
736 currentHost.set(host);
738 return new PoolingManagerTest(hostName, controller, props, activeLatch);
743 * Pooling Manager with overrides.
745 private static class PoolingManagerTest extends PoolingManagerImpl {
750 * @param hostName the host
751 * @param controller the controller
752 * @param props the properties
753 * @param activeLatch the latch
755 public PoolingManagerTest(String hostName, PolicyController controller,
756 PoolingProperties props, CountDownLatch activeLatch) {
758 super(hostName, controller, props, activeLatch);
762 protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
763 return new TopicMessageManagerImpl(topic);
767 protected boolean canDecodeEvent(DroolsController drools, String topic) {
772 protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
773 return decodeEvent(event);
778 * DMaaP Manager with overrides.
780 private static class TopicMessageManagerImpl extends TopicMessageManager {
785 * @param topic the topic
786 * @throws PoolingFeatureException if an error occurs
788 public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
793 protected List<TopicSource> getTopicSources() {
794 Host host = currentHost.get();
795 return Arrays.asList(host.internalSource, host.externalSource);
799 protected List<TopicSink> getTopicSinks() {
800 return Arrays.asList(internalSink, externalSink);
805 * Controller that also implements the {@link TopicListener} interface.
807 private static interface ListenerController extends PolicyController, TopicListener {