2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
30 import java.io.IOException;
31 import java.util.Deque;
32 import java.util.IdentityHashMap;
33 import java.util.LinkedList;
34 import java.util.Properties;
35 import java.util.TreeMap;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import org.junit.After;
41 import org.junit.AfterClass;
42 import org.junit.Before;
43 import org.junit.BeforeClass;
44 import org.junit.Ignore;
45 import org.junit.Test;
46 import org.mockito.invocation.InvocationOnMock;
47 import org.mockito.stubbing.Answer;
48 import org.onap.policy.drools.controller.DroolsController;
49 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
50 import org.onap.policy.drools.event.comm.TopicEndpoint;
51 import org.onap.policy.drools.event.comm.TopicListener;
52 import org.onap.policy.drools.event.comm.TopicSink;
53 import org.onap.policy.drools.event.comm.TopicSource;
54 import org.onap.policy.drools.properties.PolicyProperties;
55 import org.onap.policy.drools.system.PolicyController;
56 import org.onap.policy.drools.system.PolicyEngine;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import com.fasterxml.jackson.core.type.TypeReference;
60 import com.fasterxml.jackson.databind.ObjectMapper;
63 * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
64 * its own feature object. Uses real feature objects, as well as real DMaaP sources and
65 * sinks. However, the following are not:
67 * <dt>PolicyEngine, PolicyController, DroolsController</dt>
72 * The following fields must be set before executing this:
74 * <li>UEB_SERVERS</li>
75 * <li>INTERNAL_TOPIC</li>
76 * <li>EXTERNAL_TOPIC</li>
79 public class FeatureTest2 {
81 private static final Logger logger = LoggerFactory.getLogger(FeatureTest2.class);
84 * UEB servers for both internal & external topics.
86 private static final String UEB_SERVERS = "";
89 * Name of the topic used for inter-host communication.
91 private static final String INTERNAL_TOPIC = "";
94 * Name of the topic from which "external" events "arrive".
96 private static final String EXTERNAL_TOPIC = "";
99 * Consumer group to use when polling the external topic.
101 private static final String EXTERNAL_GROUP = FeatureTest2.class.getName();
104 * Name of the controller.
106 private static final String CONTROLLER1 = "controller.one";
109 * Maximum number of items to fetch from DMaaP in a single poll.
111 private static final String FETCH_LIMIT = "5";
113 private static final long STD_REACTIVATE_WAIT_MS = 10000;
114 private static final long STD_IDENTIFICATION_MS = 10000;
115 private static final long STD_START_HEARTBEAT_MS = 15000;
116 private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
117 private static final long STD_INTER_HEARTBEAT_MS = 5000;
118 private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
119 private static final long EVENT_WAIT_SEC = 15;
121 // these are saved and restored on exit from this test class
122 private static PoolingFeature.Factory saveFeatureFactory;
123 private static PoolingManagerImpl.Factory saveManagerFactory;
126 * Sink for external DMaaP topic.
128 private static TopicSink externalSink;
131 * Context for the current test case.
137 public static void setUpBeforeClass() {
138 saveFeatureFactory = PoolingFeature.getFactory();
139 saveManagerFactory = PoolingManagerImpl.getFactory();
141 Properties props = makeSinkProperties(EXTERNAL_TOPIC);
142 externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0);
143 externalSink.start();
147 public static void tearDownAfterClass() {
148 PoolingFeature.setFactory(saveFeatureFactory);
149 PoolingManagerImpl.setFactory(saveManagerFactory);
155 public void setUp() {
160 public void tearDown() {
168 public void test_SingleHost() throws Exception {
174 public void test_TwoHosts() throws Exception {
180 public void test_ThreeHosts() throws Exception {
184 private void run(int nmessages, int nhosts) throws Exception {
185 ctx = new Context(nmessages);
187 for (int x = 0; x < nhosts; ++x) {
192 ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
194 for (int x = 0; x < nmessages; ++x) {
195 ctx.offerExternal(makeMessage(x));
198 ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
200 assertEquals(0, ctx.getDecodeErrors());
201 assertEquals(0, ctx.getRemainingEvents());
202 ctx.checkAllSawAMsg();
205 private String makeMessage(int reqnum) {
206 return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
209 private static Properties makeSinkProperties(String topic) {
210 Properties props = new Properties();
212 props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS, topic);
214 props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
215 + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
216 props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
217 + PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
218 props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
219 + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
224 private static Properties makeSourceProperties(String topic) {
225 Properties props = new Properties();
227 props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
229 props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
230 + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
231 props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
232 + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
233 props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
234 + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
236 if (EXTERNAL_TOPIC.equals(topic)) {
237 // consumer group is a constant
239 PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
240 + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX,
243 // consumer instance is generated by the BusConsumer code
246 // else internal topic: feature populates info for internal topic
252 * Context used for a single test case.
254 private static class Context {
256 private final FeatureFactory featureFactory;
257 private final ManagerFactory managerFactory;
260 * Hosts that have been added to this context.
262 private final Deque<Host> hosts = new LinkedList<>();
265 * Maps a drools controller to its policy controller.
267 private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
270 * Counts the number of decode errors.
272 private final AtomicInteger nDecodeErrors = new AtomicInteger(0);
275 * Number of events we're still waiting to receive.
277 private final CountDownLatch eventCounter;
281 * @param nEvents number of events to be processed
283 public Context(int nEvents) {
284 featureFactory = new FeatureFactory(this);
285 managerFactory = new ManagerFactory(this);
286 eventCounter = new CountDownLatch(nEvents);
288 PoolingFeature.setFactory(featureFactory);
289 PoolingManagerImpl.setFactory(managerFactory);
293 * Destroys the context, stopping any hosts that remain.
295 public void destroy() {
301 * Creates and adds a new host to the context.
303 * @return the new Host
305 public Host addHost() {
306 Host host = new Host(this);
315 public void startHosts() {
316 hosts.forEach(host -> host.start());
322 public void stopHosts() {
323 hosts.forEach(host -> host.stop());
327 * Verifies that all hosts processed at least one message.
329 public void checkAllSawAMsg() {
331 for (Host host : hosts) {
332 assertTrue("x=" + x, host.messageSeen());
338 * Offers an event to the external topic.
342 public void offerExternal(String event) {
343 externalSink.send(event);
350 * @return the decoded event, or {@code null} if it cannot be decoded
352 public Object decodeEvent(String event) {
353 return managerFactory.decodeEvent(null, null, event);
357 * Associates a controller with its drools controller.
360 * @param droolsController
362 public void addController(PolicyController controller, DroolsController droolsController) {
363 drools2policy.put(droolsController, controller);
367 * @param droolsController
368 * @return the controller associated with a drools controller, or {@code null} if
369 * it has no associated controller
371 public PolicyController getController(DroolsController droolsController) {
372 return drools2policy.get(droolsController);
377 * @return the number of decode errors so far
379 public int getDecodeErrors() {
380 return nDecodeErrors.get();
384 * Increments the count of decode errors.
386 public void bumpDecodeErrors() {
387 nDecodeErrors.incrementAndGet();
392 * @return the number of events that haven't been processed
394 public long getRemainingEvents() {
395 return eventCounter.getCount();
399 * Adds an event to the counter.
401 public void addEvent() {
402 eventCounter.countDown();
406 * Waits, for a period of time, for all events to be processed.
410 * @return {@code true} if all events have been processed, {@code false} otherwise
411 * @throws InterruptedException
413 public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
414 return eventCounter.await(time, units);
418 * Waits, for a period of time, for all hosts to enter the Active state.
420 * @param timeMs maximum time to wait, in milliseconds
421 * @throws InterruptedException
423 public void awaitAllActive(long timeMs) throws InterruptedException {
424 long tend = timeMs + System.currentTimeMillis();
426 for (Host host : hosts) {
427 long tremain = Math.max(0, tend - System.currentTimeMillis());
428 assertTrue(host.awaitActive(tremain));
434 * Simulates a single "host".
436 private static class Host {
438 private final PoolingFeature feature = new PoolingFeature();
441 * {@code True} if this host has processed a message, {@code false} otherwise.
443 private final AtomicBoolean sawMsg = new AtomicBoolean(false);
445 private final TopicSource externalSource;
448 private final PolicyEngine engine = mock(PolicyEngine.class);
449 private final ListenerController controller = mock(ListenerController.class);
450 private final DroolsController drools = mock(DroolsController.class);
456 public Host(Context context) {
458 when(controller.getName()).thenReturn(CONTROLLER1);
459 when(controller.getDrools()).thenReturn(drools);
461 Properties props = makeSourceProperties(EXTERNAL_TOPIC);
462 externalSource = TopicEndpoint.manager.addTopicSources(props).get(0);
464 // stop consuming events if the controller stops
465 when(controller.stop()).thenAnswer(args -> {
466 externalSource.unregister(controller);
470 doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
472 context.addController(controller, drools);
476 * Waits, for a period of time, for the host to enter the Active state.
478 * @param timeMs time to wait, in milliseconds
479 * @return {@code true} if the host entered the Active state within the given
480 * amount of time, {@code false} otherwise
481 * @throws InterruptedException
483 public boolean awaitActive(long timeMs) throws InterruptedException {
484 return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
488 * Starts threads for the host so that it begins consuming from both the external
489 * "DMaaP" topic and its own internal "DMaaP" topic.
491 public void start() {
492 feature.beforeStart(engine);
493 feature.afterCreate(controller);
495 feature.beforeStart(controller);
497 // start consuming events from the external topic
498 externalSource.register(controller);
500 feature.afterStart(controller);
504 * Stops the host's threads.
507 feature.beforeStop(controller);
508 externalSource.unregister(controller);
509 feature.afterStop(controller);
513 * Offers an event to the feature, before the policy controller handles it.
518 * @return {@code true} if the event was handled, {@code false} otherwise
520 public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
521 return feature.beforeOffer(controller, protocol, topic2, event);
525 * Offers an event to the feature, after the policy controller handles it.
531 * @return {@code true} if the event was handled, {@code false} otherwise
533 public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
535 return feature.afterOffer(controller, protocol, topic, event, success);
539 * Offers an event to the feature, before the drools controller handles it.
542 * @return {@code true} if the event was handled, {@code false} otherwise
544 public boolean beforeInsert(Object fact) {
545 return feature.beforeInsert(drools, fact);
549 * Offers an event to the feature, after the drools controller handles it.
552 * @param successInsert {@code true} if it was successfully inserted by the drools
553 * controller, {@code false} otherwise
554 * @return {@code true} if the event was handled, {@code false} otherwise
556 public boolean afterInsert(Object fact, boolean successInsert) {
557 return feature.afterInsert(drools, fact, successInsert);
561 * Indicates that a message was seen for this host.
563 public void sawMessage() {
569 * @return {@code true} if a message was seen for this host, {@code false}
572 public boolean messageSeen() {
578 * Listener for the external topic. Simulates the actions taken by
579 * <i>AggregatedPolicyController.onTopicEvent</i>.
581 private static class MyExternalTopicListener implements Answer<Void> {
583 private final Context context;
584 private final Host host;
586 public MyExternalTopicListener(Context context, Host host) {
587 this.context = context;
592 public Void answer(InvocationOnMock args) throws Throwable {
594 CommInfrastructure commType = args.getArgument(i++);
595 String topic = args.getArgument(i++);
596 String event = args.getArgument(i++);
598 if (host.beforeOffer(commType, topic, event)) {
603 Object fact = context.decodeEvent(event);
607 context.bumpDecodeErrors();
612 if (!host.beforeInsert(fact)) {
613 // feature did not handle it so we handle it here
614 host.afterInsert(fact, result);
621 host.afterOffer(commType, topic, event, result);
627 * Simulator for the feature-level factory.
629 private static class FeatureFactory extends PoolingFeature.Factory {
631 private final Context context;
637 public FeatureFactory(Context context) {
638 this.context = context;
641 * Note: do NOT extract anything from "context" at this point, because it
642 * hasn't been fully initialized yet
647 public Properties getProperties(String featName) {
648 Properties props = new Properties();
650 props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
652 props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
653 props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
654 props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
655 props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
656 props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
657 "" + STD_OFFLINE_PUB_WAIT_MS);
658 props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
659 "" + STD_START_HEARTBEAT_MS);
660 props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
661 props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
662 props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
663 "" + STD_ACTIVE_HEARTBEAT_MS);
664 props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
665 "" + STD_INTER_HEARTBEAT_MS);
667 props.putAll(makeSinkProperties(INTERNAL_TOPIC));
668 props.putAll(makeSourceProperties(INTERNAL_TOPIC));
674 public PolicyController getController(DroolsController droolsController) {
675 return context.getController(droolsController);
680 * Simulator for the pooling manager factory.
682 private static class ManagerFactory extends PoolingManagerImpl.Factory {
685 * Used to decode events from the external topic.
687 private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
689 protected ObjectMapper initialValue() {
690 return new ObjectMapper();
695 * Used to decode events into a Map.
697 private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {};
703 public ManagerFactory(Context context) {
706 * Note: do NOT extract anything from "context" at this point, because it
707 * hasn't been fully initialized yet
712 public boolean canDecodeEvent(DroolsController drools, String topic) {
717 public Object decodeEvent(DroolsController drools, String topic, String event) {
719 return mapper.get().readValue(event, typeRef);
721 } catch (IOException e) {
722 logger.warn("cannot decode external event", e);
729 * Controller that also implements the {@link TopicListener} interface.
731 private static interface ListenerController extends PolicyController, TopicListener {