Merge "Close old UEB/DMaaP consumer"
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / FeatureTest2.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29 import static org.onap.policy.common.utils.properties.SpecPropertyConfiguration.specialize;
30 import java.io.IOException;
31 import java.util.Deque;
32 import java.util.IdentityHashMap;
33 import java.util.LinkedList;
34 import java.util.Properties;
35 import java.util.TreeMap;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import org.junit.After;
41 import org.junit.AfterClass;
42 import org.junit.Before;
43 import org.junit.BeforeClass;
44 import org.junit.Ignore;
45 import org.junit.Test;
46 import org.mockito.invocation.InvocationOnMock;
47 import org.mockito.stubbing.Answer;
48 import org.onap.policy.drools.controller.DroolsController;
49 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
50 import org.onap.policy.drools.event.comm.TopicEndpoint;
51 import org.onap.policy.drools.event.comm.TopicListener;
52 import org.onap.policy.drools.event.comm.TopicSink;
53 import org.onap.policy.drools.event.comm.TopicSource;
54 import org.onap.policy.drools.properties.PolicyProperties;
55 import org.onap.policy.drools.system.PolicyController;
56 import org.onap.policy.drools.system.PolicyEngine;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import com.fasterxml.jackson.core.type.TypeReference;
60 import com.fasterxml.jackson.databind.ObjectMapper;
61
62 /**
63  * End-to-end tests of the pooling feature. Launches one or more "hosts", each one having
64  * its own feature object. Uses real feature objects, as well as real DMaaP sources and
65  * sinks. However, the following are not:
66  * <dl>
67  * <dt>PolicyEngine, PolicyController, DroolsController</dt>
68  * <dd>mocked</dd>
69  * </dl>
70  * 
71  * <p>
72  * The following fields must be set before executing this:
73  * <ul>
74  * <li>UEB_SERVERS</li>
75  * <li>INTERNAL_TOPIC</li>
76  * <li>EXTERNAL_TOPIC</li>
77  * </ul>
78  */
79 public class FeatureTest2 {
80
81     private static final Logger logger = LoggerFactory.getLogger(FeatureTest2.class);
82
83     /**
84      * UEB servers for both internal & external topics.
85      */
86     private static final String UEB_SERVERS = "";
87
88     /**
89      * Name of the topic used for inter-host communication.
90      */
91     private static final String INTERNAL_TOPIC = "";
92
93     /**
94      * Name of the topic from which "external" events "arrive".
95      */
96     private static final String EXTERNAL_TOPIC = "";
97
98     /**
99      * Consumer group to use when polling the external topic.
100      */
101     private static final String EXTERNAL_GROUP = FeatureTest2.class.getName();
102
103     /**
104      * Name of the controller.
105      */
106     private static final String CONTROLLER1 = "controller.one";
107
108     /**
109      * Maximum number of items to fetch from DMaaP in a single poll.
110      */
111     private static final String FETCH_LIMIT = "5";
112
113     private static final long STD_REACTIVATE_WAIT_MS = 10000;
114     private static final long STD_IDENTIFICATION_MS = 10000;
115     private static final long STD_START_HEARTBEAT_MS = 15000;
116     private static final long STD_ACTIVE_HEARTBEAT_MS = 12000;
117     private static final long STD_INTER_HEARTBEAT_MS = 5000;
118     private static final long STD_OFFLINE_PUB_WAIT_MS = 2;
119     private static final long EVENT_WAIT_SEC = 15;
120
121     // these are saved and restored on exit from this test class
122     private static PoolingFeature.Factory saveFeatureFactory;
123     private static PoolingManagerImpl.Factory saveManagerFactory;
124
125     /**
126      * Sink for external DMaaP topic.
127      */
128     private static TopicSink externalSink;
129
130     /**
131      * Context for the current test case.
132      */
133     private Context ctx;
134
135
136     @BeforeClass
137     public static void setUpBeforeClass() {
138         saveFeatureFactory = PoolingFeature.getFactory();
139         saveManagerFactory = PoolingManagerImpl.getFactory();
140
141         Properties props = makeSinkProperties(EXTERNAL_TOPIC);
142         externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0);
143         externalSink.start();
144     }
145
146     @AfterClass
147     public static void tearDownAfterClass() {
148         PoolingFeature.setFactory(saveFeatureFactory);
149         PoolingManagerImpl.setFactory(saveManagerFactory);
150
151         externalSink.stop();
152     }
153
154     @Before
155     public void setUp() {
156         ctx = null;
157     }
158
159     @After
160     public void tearDown() {
161         if (ctx != null) {
162             ctx.destroy();
163         }
164     }
165
166     @Ignore
167     @Test
168     public void test_SingleHost() throws Exception {
169         run(70, 1);
170     }
171
172     @Ignore
173     @Test
174     public void test_TwoHosts() throws Exception {
175         run(200, 2);
176     }
177
178     @Ignore
179     @Test
180     public void test_ThreeHosts() throws Exception {
181         run(200, 3);
182     }
183
184     private void run(int nmessages, int nhosts) throws Exception {
185         ctx = new Context(nmessages);
186
187         for (int x = 0; x < nhosts; ++x) {
188             ctx.addHost();
189         }
190
191         ctx.startHosts();
192         ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
193
194         for (int x = 0; x < nmessages; ++x) {
195             ctx.offerExternal(makeMessage(x));
196         }
197
198         ctx.awaitEvents(EVENT_WAIT_SEC, TimeUnit.SECONDS);
199
200         assertEquals(0, ctx.getDecodeErrors());
201         assertEquals(0, ctx.getRemainingEvents());
202         ctx.checkAllSawAMsg();
203     }
204
205     private String makeMessage(int reqnum) {
206         return "{\"reqid\":\"req" + reqnum + "\", \"data\":\"hello " + reqnum + "\"}";
207     }
208
209     private static Properties makeSinkProperties(String topic) {
210         Properties props = new Properties();
211
212         props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS, topic);
213
214         props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
215                         + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
216         props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
217                         + PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
218         props.setProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
219                         + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
220
221         return props;
222     }
223
224     private static Properties makeSourceProperties(String topic) {
225         Properties props = new Properties();
226
227         props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS, topic);
228
229         props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
230                         + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
231         props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
232                         + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
233         props.setProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
234                         + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "false");
235
236         if (EXTERNAL_TOPIC.equals(topic)) {
237             // consumer group is a constant
238             props.setProperty(
239                             PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
240                                             + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX,
241                             EXTERNAL_GROUP);
242
243             // consumer instance is generated by the BusConsumer code
244         }
245
246         // else internal topic: feature populates info for internal topic
247
248         return props;
249     }
250
251     /**
252      * Context used for a single test case.
253      */
254     private static class Context {
255
256         private final FeatureFactory featureFactory;
257         private final ManagerFactory managerFactory;
258
259         /**
260          * Hosts that have been added to this context.
261          */
262         private final Deque<Host> hosts = new LinkedList<>();
263
264         /**
265          * Maps a drools controller to its policy controller.
266          */
267         private final IdentityHashMap<DroolsController, PolicyController> drools2policy = new IdentityHashMap<>();
268
269         /**
270          * Counts the number of decode errors.
271          */
272         private final AtomicInteger nDecodeErrors = new AtomicInteger(0);
273
274         /**
275          * Number of events we're still waiting to receive.
276          */
277         private final CountDownLatch eventCounter;
278
279         /**
280          * 
281          * @param nEvents number of events to be processed
282          */
283         public Context(int nEvents) {
284             featureFactory = new FeatureFactory(this);
285             managerFactory = new ManagerFactory(this);
286             eventCounter = new CountDownLatch(nEvents);
287
288             PoolingFeature.setFactory(featureFactory);
289             PoolingManagerImpl.setFactory(managerFactory);
290         }
291
292         /**
293          * Destroys the context, stopping any hosts that remain.
294          */
295         public void destroy() {
296             stopHosts();
297             hosts.clear();
298         }
299
300         /**
301          * Creates and adds a new host to the context.
302          * 
303          * @return the new Host
304          */
305         public Host addHost() {
306             Host host = new Host(this);
307             hosts.add(host);
308
309             return host;
310         }
311
312         /**
313          * Starts the hosts.
314          */
315         public void startHosts() {
316             hosts.forEach(host -> host.start());
317         }
318
319         /**
320          * Stops the hosts.
321          */
322         public void stopHosts() {
323             hosts.forEach(host -> host.stop());
324         }
325
326         /**
327          * Verifies that all hosts processed at least one message.
328          */
329         public void checkAllSawAMsg() {
330             int x = 0;
331             for (Host host : hosts) {
332                 assertTrue("x=" + x, host.messageSeen());
333                 ++x;
334             }
335         }
336
337         /**
338          * Offers an event to the external topic.
339          * 
340          * @param event
341          */
342         public void offerExternal(String event) {
343             externalSink.send(event);
344         }
345
346         /**
347          * Decodes an event.
348          * 
349          * @param event
350          * @return the decoded event, or {@code null} if it cannot be decoded
351          */
352         public Object decodeEvent(String event) {
353             return managerFactory.decodeEvent(null, null, event);
354         }
355
356         /**
357          * Associates a controller with its drools controller.
358          * 
359          * @param controller
360          * @param droolsController
361          */
362         public void addController(PolicyController controller, DroolsController droolsController) {
363             drools2policy.put(droolsController, controller);
364         }
365
366         /**
367          * @param droolsController
368          * @return the controller associated with a drools controller, or {@code null} if
369          *         it has no associated controller
370          */
371         public PolicyController getController(DroolsController droolsController) {
372             return drools2policy.get(droolsController);
373         }
374
375         /**
376          * 
377          * @return the number of decode errors so far
378          */
379         public int getDecodeErrors() {
380             return nDecodeErrors.get();
381         }
382
383         /**
384          * Increments the count of decode errors.
385          */
386         public void bumpDecodeErrors() {
387             nDecodeErrors.incrementAndGet();
388         }
389
390         /**
391          * 
392          * @return the number of events that haven't been processed
393          */
394         public long getRemainingEvents() {
395             return eventCounter.getCount();
396         }
397
398         /**
399          * Adds an event to the counter.
400          */
401         public void addEvent() {
402             eventCounter.countDown();
403         }
404
405         /**
406          * Waits, for a period of time, for all events to be processed.
407          * 
408          * @param time
409          * @param units
410          * @return {@code true} if all events have been processed, {@code false} otherwise
411          * @throws InterruptedException
412          */
413         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
414             return eventCounter.await(time, units);
415         }
416
417         /**
418          * Waits, for a period of time, for all hosts to enter the Active state.
419          * 
420          * @param timeMs maximum time to wait, in milliseconds
421          * @throws InterruptedException
422          */
423         public void awaitAllActive(long timeMs) throws InterruptedException {
424             long tend = timeMs + System.currentTimeMillis();
425
426             for (Host host : hosts) {
427                 long tremain = Math.max(0, tend - System.currentTimeMillis());
428                 assertTrue(host.awaitActive(tremain));
429             }
430         }
431     }
432
433     /**
434      * Simulates a single "host".
435      */
436     private static class Host {
437
438         private final PoolingFeature feature = new PoolingFeature();
439
440         /**
441          * {@code True} if this host has processed a message, {@code false} otherwise.
442          */
443         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
444
445         private final TopicSource externalSource;
446
447         // mock objects
448         private final PolicyEngine engine = mock(PolicyEngine.class);
449         private final ListenerController controller = mock(ListenerController.class);
450         private final DroolsController drools = mock(DroolsController.class);
451
452         /**
453          * 
454          * @param context
455          */
456         public Host(Context context) {
457
458             when(controller.getName()).thenReturn(CONTROLLER1);
459             when(controller.getDrools()).thenReturn(drools);
460
461             Properties props = makeSourceProperties(EXTERNAL_TOPIC);
462             externalSource = TopicEndpoint.manager.addTopicSources(props).get(0);
463
464             // stop consuming events if the controller stops
465             when(controller.stop()).thenAnswer(args -> {
466                 externalSource.unregister(controller);
467                 return true;
468             });
469
470             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
471
472             context.addController(controller, drools);
473         }
474
475         /**
476          * Waits, for a period of time, for the host to enter the Active state.
477          * 
478          * @param timeMs time to wait, in milliseconds
479          * @return {@code true} if the host entered the Active state within the given
480          *         amount of time, {@code false} otherwise
481          * @throws InterruptedException
482          */
483         public boolean awaitActive(long timeMs) throws InterruptedException {
484             return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
485         }
486
487         /**
488          * Starts threads for the host so that it begins consuming from both the external
489          * "DMaaP" topic and its own internal "DMaaP" topic.
490          */
491         public void start() {
492             feature.beforeStart(engine);
493             feature.afterCreate(controller);
494
495             feature.beforeStart(controller);
496
497             // start consuming events from the external topic
498             externalSource.register(controller);
499
500             feature.afterStart(controller);
501         }
502
503         /**
504          * Stops the host's threads.
505          */
506         public void stop() {
507             feature.beforeStop(controller);
508             externalSource.unregister(controller);
509             feature.afterStop(controller);
510         }
511
512         /**
513          * Offers an event to the feature, before the policy controller handles it.
514          * 
515          * @param protocol
516          * @param topic2
517          * @param event
518          * @return {@code true} if the event was handled, {@code false} otherwise
519          */
520         public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
521             return feature.beforeOffer(controller, protocol, topic2, event);
522         }
523
524         /**
525          * Offers an event to the feature, after the policy controller handles it.
526          * 
527          * @param protocol
528          * @param topic
529          * @param event
530          * @param success
531          * @return {@code true} if the event was handled, {@code false} otherwise
532          */
533         public boolean afterOffer(CommInfrastructure protocol, String topic, String event, boolean success) {
534
535             return feature.afterOffer(controller, protocol, topic, event, success);
536         }
537
538         /**
539          * Offers an event to the feature, before the drools controller handles it.
540          * 
541          * @param fact
542          * @return {@code true} if the event was handled, {@code false} otherwise
543          */
544         public boolean beforeInsert(Object fact) {
545             return feature.beforeInsert(drools, fact);
546         }
547
548         /**
549          * Offers an event to the feature, after the drools controller handles it.
550          * 
551          * @param fact
552          * @param successInsert {@code true} if it was successfully inserted by the drools
553          *        controller, {@code false} otherwise
554          * @return {@code true} if the event was handled, {@code false} otherwise
555          */
556         public boolean afterInsert(Object fact, boolean successInsert) {
557             return feature.afterInsert(drools, fact, successInsert);
558         }
559
560         /**
561          * Indicates that a message was seen for this host.
562          */
563         public void sawMessage() {
564             sawMsg.set(true);
565         }
566
567         /**
568          * 
569          * @return {@code true} if a message was seen for this host, {@code false}
570          *         otherwise
571          */
572         public boolean messageSeen() {
573             return sawMsg.get();
574         }
575     }
576
577     /**
578      * Listener for the external topic. Simulates the actions taken by
579      * <i>AggregatedPolicyController.onTopicEvent</i>.
580      */
581     private static class MyExternalTopicListener implements Answer<Void> {
582
583         private final Context context;
584         private final Host host;
585
586         public MyExternalTopicListener(Context context, Host host) {
587             this.context = context;
588             this.host = host;
589         }
590
591         @Override
592         public Void answer(InvocationOnMock args) throws Throwable {
593             int i = 0;
594             CommInfrastructure commType = args.getArgument(i++);
595             String topic = args.getArgument(i++);
596             String event = args.getArgument(i++);
597
598             if (host.beforeOffer(commType, topic, event)) {
599                 return null;
600             }
601
602             boolean result;
603             Object fact = context.decodeEvent(event);
604
605             if (fact == null) {
606                 result = false;
607                 context.bumpDecodeErrors();
608
609             } else {
610                 result = true;
611
612                 if (!host.beforeInsert(fact)) {
613                     // feature did not handle it so we handle it here
614                     host.afterInsert(fact, result);
615
616                     host.sawMessage();
617                     context.addEvent();
618                 }
619             }
620
621             host.afterOffer(commType, topic, event, result);
622             return null;
623         }
624     }
625
626     /**
627      * Simulator for the feature-level factory.
628      */
629     private static class FeatureFactory extends PoolingFeature.Factory {
630
631         private final Context context;
632
633         /**
634          * 
635          * @param context
636          */
637         public FeatureFactory(Context context) {
638             this.context = context;
639
640             /*
641              * Note: do NOT extract anything from "context" at this point, because it
642              * hasn't been fully initialized yet
643              */
644         }
645
646         @Override
647         public Properties getProperties(String featName) {
648             Properties props = new Properties();
649
650             props.setProperty(PoolingProperties.PROP_EXTRACTOR_PREFIX + ".java.util.Map", "${reqid}");
651
652             props.setProperty(specialize(PoolingProperties.FEATURE_ENABLED, CONTROLLER1), "true");
653             props.setProperty(specialize(PoolingProperties.POOLING_TOPIC, CONTROLLER1), INTERNAL_TOPIC);
654             props.setProperty(specialize(PoolingProperties.OFFLINE_LIMIT, CONTROLLER1), "10000");
655             props.setProperty(specialize(PoolingProperties.OFFLINE_AGE_MS, CONTROLLER1), "1000000");
656             props.setProperty(specialize(PoolingProperties.OFFLINE_PUB_WAIT_MS, CONTROLLER1),
657                             "" + STD_OFFLINE_PUB_WAIT_MS);
658             props.setProperty(specialize(PoolingProperties.START_HEARTBEAT_MS, CONTROLLER1),
659                             "" + STD_START_HEARTBEAT_MS);
660             props.setProperty(specialize(PoolingProperties.REACTIVATE_MS, CONTROLLER1), "" + STD_REACTIVATE_WAIT_MS);
661             props.setProperty(specialize(PoolingProperties.IDENTIFICATION_MS, CONTROLLER1), "" + STD_IDENTIFICATION_MS);
662             props.setProperty(specialize(PoolingProperties.ACTIVE_HEARTBEAT_MS, CONTROLLER1),
663                             "" + STD_ACTIVE_HEARTBEAT_MS);
664             props.setProperty(specialize(PoolingProperties.INTER_HEARTBEAT_MS, CONTROLLER1),
665                             "" + STD_INTER_HEARTBEAT_MS);
666
667             props.putAll(makeSinkProperties(INTERNAL_TOPIC));
668             props.putAll(makeSourceProperties(INTERNAL_TOPIC));
669
670             return props;
671         }
672
673         @Override
674         public PolicyController getController(DroolsController droolsController) {
675             return context.getController(droolsController);
676         }
677     }
678
679     /**
680      * Simulator for the pooling manager factory.
681      */
682     private static class ManagerFactory extends PoolingManagerImpl.Factory {
683
684         /**
685          * Used to decode events from the external topic.
686          */
687         private final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() {
688             @Override
689             protected ObjectMapper initialValue() {
690                 return new ObjectMapper();
691             }
692         };
693
694         /**
695          * Used to decode events into a Map.
696          */
697         private final TypeReference<TreeMap<String, String>> typeRef = new TypeReference<TreeMap<String, String>>() {};
698
699         /**
700          * 
701          * @param context
702          */
703         public ManagerFactory(Context context) {
704
705             /*
706              * Note: do NOT extract anything from "context" at this point, because it
707              * hasn't been fully initialized yet
708              */
709         }
710
711         @Override
712         public boolean canDecodeEvent(DroolsController drools, String topic) {
713             return true;
714         }
715
716         @Override
717         public Object decodeEvent(DroolsController drools, String topic, String event) {
718             try {
719                 return mapper.get().readValue(event, typeRef);
720
721             } catch (IOException e) {
722                 logger.warn("cannot decode external event", e);
723                 return null;
724             }
725         }
726     }
727
728     /**
729      * Controller that also implements the {@link TopicListener} interface.
730      */
731     private static interface ListenerController extends PolicyController, TopicListener {
732
733     }
734 }