Make feature-pooling-dmaap work without filtering
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / PoolingManagerImpl.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import com.google.gson.JsonParseException;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
31 import org.onap.policy.common.endpoints.event.comm.TopicListener;
32 import org.onap.policy.drools.controller.DroolsController;
33 import org.onap.policy.drools.pooling.message.BucketAssignments;
34 import org.onap.policy.drools.pooling.message.Leader;
35 import org.onap.policy.drools.pooling.message.Message;
36 import org.onap.policy.drools.pooling.message.Offline;
37 import org.onap.policy.drools.pooling.state.ActiveState;
38 import org.onap.policy.drools.pooling.state.IdleState;
39 import org.onap.policy.drools.pooling.state.InactiveState;
40 import org.onap.policy.drools.pooling.state.QueryState;
41 import org.onap.policy.drools.pooling.state.StartState;
42 import org.onap.policy.drools.pooling.state.State;
43 import org.onap.policy.drools.pooling.state.StateTimerTask;
44 import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
45 import org.onap.policy.drools.system.PolicyController;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
51  * events coming from external topics are saved in a queue for later processing. Once
52  * assignments are made, the saved events are processed. In addition, while the controller
53  * is locked, events are still forwarded to other hosts and bucket assignments are still
54  * updated, based on any {@link Leader} messages that it receives.
55  */
56 public class PoolingManagerImpl implements PoolingManager, TopicListener {
57
58     private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
59
60     /**
61      * Maximum number of times a message can be forwarded.
62      */
63     public static final int MAX_HOPS = 5;
64
65     /**
66      * ID of this host.
67      */
68     private final String host;
69
70     /**
71      * Properties with which this was configured.
72      */
73     private final PoolingProperties props;
74
75     /**
76      * Associated controller.
77      */
78     private final PolicyController controller;
79
80     /**
81      * Decremented each time the manager enters the Active state. Used by junit tests.
82      */
83     private final CountDownLatch activeLatch;
84
85     /**
86      * Used to encode & decode request objects received from & sent to a rule engine.
87      */
88     private final Serializer serializer;
89
90     /**
91      * Internal DMaaP topic used by this controller.
92      */
93     private final String topic;
94
95     /**
96      * Manager for the internal DMaaP topic.
97      */
98     private final DmaapManager dmaapMgr;
99
100     /**
101      * Lock used while updating {@link #current}. In general, public methods must use
102      * this, while private methods assume the lock is already held.
103      */
104     private final Object curLocker = new Object();
105
106     /**
107      * Current state.
108      *
109      * <p>This uses a finite state machine, wherein the state object contains all of the data
110      * relevant to that state. Each state object has a process() method, specific to each
111      * type of {@link Message} subclass. The method returns the next state object, or
112      * {@code null} if the state is to remain the same.
113      */
114     private State current;
115
116     /**
117      * Current bucket assignments or {@code null}.
118      */
119     private BucketAssignments assignments = null;
120
121     /**
122      * Pool used to execute timers.
123      */
124     private ScheduledThreadPoolExecutor scheduler = null;
125
126     /**
127      * Constructs the manager, initializing all of the data structures.
128      *
129      * @param host name/uuid of this host
130      * @param controller controller with which this is associated
131      * @param props feature properties specific to the controller
132      * @param activeLatch latch to be decremented each time the manager enters the Active
133      *        state
134      */
135     public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
136                     CountDownLatch activeLatch) {
137         this.host = host;
138         this.controller = controller;
139         this.props = props;
140         this.activeLatch = activeLatch;
141
142         try {
143             this.serializer = new Serializer();
144             this.topic = props.getPoolingTopic();
145             this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
146             this.current = new IdleState(this);
147
148             logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
149
150         } catch (ClassCastException e) {
151             logger.error("not a topic listener, controller {}", controller.getName());
152             throw new PoolingFeatureRtException(e);
153
154         } catch (PoolingFeatureException e) {
155             logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
156             throw new PoolingFeatureRtException(e);
157         }
158     }
159
160     /**
161      * Should only be used by junit tests.
162      *
163      * @return the current state
164      */
165     protected State getCurrent() {
166         synchronized (curLocker) {
167             return current;
168         }
169     }
170
171     @Override
172     public String getHost() {
173         return host;
174     }
175
176     @Override
177     public String getTopic() {
178         return topic;
179     }
180
181     @Override
182     public PoolingProperties getProperties() {
183         return props;
184     }
185
186     /**
187      * Indicates that the controller is about to start. Starts the publisher for the
188      * internal topic, and creates a thread pool for the timers.
189      */
190     public void beforeStart() {
191         synchronized (curLocker) {
192             if (scheduler == null) {
193                 dmaapMgr.startPublisher();
194
195                 logger.debug("make scheduler thread for topic {}", getTopic());
196                 scheduler = makeScheduler();
197
198                 /*
199                  * Only a handful of timers at any moment, thus we can afford to take the
200                  * time to remove them when they're cancelled.
201                  */
202                 scheduler.setRemoveOnCancelPolicy(true);
203                 scheduler.setMaximumPoolSize(1);
204                 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
205                 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
206             }
207         }
208     }
209
210     /**
211      * Indicates that the controller has successfully started. Starts the consumer for the
212      * internal topic, enters the {@link StartState}, and sets the filter for the initial
213      * state.
214      */
215     public void afterStart() {
216         synchronized (curLocker) {
217             if (current instanceof IdleState) {
218                 dmaapMgr.startConsumer(this);
219                 changeState(new StartState(this));
220             }
221         }
222     }
223
224     /**
225      * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
226      * and the current state.
227      */
228     public void beforeStop() {
229         ScheduledThreadPoolExecutor sched;
230
231         synchronized (curLocker) {
232             sched = scheduler;
233             scheduler = null;
234
235             if (!(current instanceof IdleState)) {
236                 changeState(new IdleState(this));
237                 dmaapMgr.stopConsumer(this);
238                 publishAdmin(new Offline(getHost()));
239             }
240
241             assignments = null;
242         }
243
244         if (sched != null) {
245             logger.debug("stop scheduler for topic {}", getTopic());
246             sched.shutdownNow();
247         }
248     }
249
250     /**
251      * Indicates that the controller has stopped. Stops the publisher and logs a warning
252      * if any events are still in the queue.
253      */
254     public void afterStop() {
255         synchronized (curLocker) {
256             /*
257              * stop the publisher, but allow time for any Offline message to be
258              * transmitted
259              */
260             dmaapMgr.stopPublisher(props.getOfflinePubWaitMs());
261         }
262     }
263
264     /**
265      * Indicates that the controller is about to be locked. Enters the idle state, as all
266      * it will be doing is forwarding messages.
267      */
268     public void beforeLock() {
269         logger.info("locking manager for topic {}", getTopic());
270
271         synchronized (curLocker) {
272             changeState(new IdleState(this));
273         }
274     }
275
276     /**
277      * Indicates that the controller has been unlocked. Enters the start state, if the
278      * controller is running.
279      */
280     public void afterUnlock() {
281         logger.info("unlocking manager for topic {}", getTopic());
282
283         synchronized (curLocker) {
284             if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
285                 changeState(new StartState(this));
286             }
287         }
288     }
289
290     /**
291      * Changes the finite state machine to a new state, provided the new state is not
292      * {@code null}.
293      *
294      * @param newState new state, or {@code null} if to remain unchanged
295      */
296     private void changeState(State newState) {
297         if (newState != null) {
298             current.cancelTimers();
299             current = newState;
300
301             newState.start();
302         }
303     }
304
305     @Override
306     public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
307         // wrap the task in a TimerAction and schedule it
308         ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
309
310         // wrap the future in a "CancellableScheduledTask"
311         return () -> fut.cancel(false);
312     }
313
314     @Override
315     public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
316         // wrap the task in a TimerAction and schedule it
317         ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
318                         TimeUnit.MILLISECONDS);
319
320         // wrap the future in a "CancellableScheduledTask"
321         return () -> fut.cancel(false);
322     }
323
324     @Override
325     public void publishAdmin(Message msg) {
326         publish(Message.ADMIN, msg);
327     }
328
329     @Override
330     public void publish(String channel, Message msg) {
331         logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
332
333         msg.setChannel(channel);
334
335         try {
336             // ensure it's valid before we send it
337             msg.checkValidity();
338
339             String txt = serializer.encodeMsg(msg);
340             dmaapMgr.publish(txt);
341
342         } catch (JsonParseException e) {
343             logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
344
345         } catch (PoolingFeatureException e) {
346             logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
347         }
348     }
349
350     /**
351      * Handles an event from the internal topic.
352      *
353      * @param commType comm infrastructure
354      * @param topic2 topic
355      * @param event event
356      */
357     @Override
358     public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
359
360         if (event == null) {
361             logger.error("null event on topic {}", topic);
362             return;
363         }
364
365         synchronized (curLocker) {
366             // it's on the internal topic
367             handleInternal(event);
368         }
369     }
370
371     /**
372      * Called by the PolicyController before it offers the event to the DroolsController.
373      * If the controller is locked, then it isn't processing events. However, they still
374      * need to be forwarded, thus in that case, they are decoded and forwarded.
375      *
376      * <p>On the other hand, if the controller is not locked, then we just return immediately
377      * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
378      * it instead, as it already has the decoded message.
379      *
380      * @param topic2 topic
381      * @param event event
382      * @return {@code true} if the event was handled by the manager, {@code false} if it
383      *         must still be handled by the invoker
384      */
385     public boolean beforeOffer(String topic2, String event) {
386
387         if (!controller.isLocked()) {
388             // we should NOT intercept this message - let the invoker handle it
389             return false;
390         }
391
392         return handleExternal(topic2, decodeEvent(topic2, event));
393     }
394
395     /**
396      * Called by the DroolsController before it inserts the event into the rule engine.
397      *
398      * @param topic2 topic
399      * @param event event, as an object
400      * @return {@code true} if the event was handled by the manager, {@code false} if it
401      *         must still be handled by the invoker
402      */
403     public boolean beforeInsert(String topic2, Object event) {
404         return handleExternal(topic2, event);
405     }
406
407     /**
408      * Handles an event from an external topic.
409      *
410      * @param topic2 topic
411      * @param event event, as an object, or {@code null} if it cannot be decoded
412      * @return {@code true} if the event was handled by the manager, {@code false} if it
413      *         must still be handled by the invoker
414      */
415     private boolean handleExternal(String topic2, Object event) {
416         if (event == null) {
417             // no event - let the invoker handle it
418             return false;
419         }
420
421         synchronized (curLocker) {
422             return handleExternal(topic2, event, event.hashCode());
423         }
424     }
425
426     /**
427      * Handles an event from an external topic.
428      *
429      * @param topic2 topic
430      * @param event event, as an object
431      * @param eventHashCode event's hash code
432      * @return {@code true} if the event was handled, {@code false} if the invoker should
433      *         handle it
434      */
435     private boolean handleExternal(String topic2, Object event, int eventHashCode) {
436         if (assignments == null) {
437             // no bucket assignments yet - handle locally
438             logger.info("handle event locally for request {}", event);
439
440             // we did NOT consume the event
441             return false;
442
443         } else {
444             return handleEvent(topic2, event, eventHashCode);
445         }
446     }
447
448     /**
449      * Handles a {@link Forward} event, possibly forwarding it again.
450      *
451      * @param topic2 topic
452      * @param event event, as an object
453      * @param eventHashCode event's hash code
454      * @return {@code true} if the event was handled, {@code false} if the invoker should
455      *         handle it
456      */
457     private boolean handleEvent(String topic2, Object event, int eventHashCode) {
458         String target = assignments.getAssignedHost(eventHashCode);
459
460         if (target == null) {
461             /*
462              * This bucket has no assignment - just discard the event
463              */
464             logger.warn("discarded event for unassigned bucket from topic {}", topic2);
465             return true;
466         }
467
468         if (target.equals(host)) {
469             /*
470              * Message belongs to this host - allow the controller to handle it.
471              */
472             logger.info("handle local event for request {} from topic {}", event, topic2);
473             return false;
474         }
475
476         // not our message, consume the event
477         logger.warn("discarded event for host {} from topic {}", target, topic2);
478         return true;
479     }
480
481     /**
482      * Decodes an event from a String into an event Object.
483      *
484      * @param topic2 topic
485      * @param event event
486      * @return the decoded event object, or {@code null} if it can't be decoded
487      */
488     private Object decodeEvent(String topic2, String event) {
489         DroolsController drools = controller.getDrools();
490
491         // check if this topic has a decoder
492
493         if (!canDecodeEvent(drools, topic2)) {
494
495             logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
496                             drools.getArtifactId());
497             return null;
498         }
499
500         // decode
501
502         try {
503             return decodeEventWrapper(drools, topic2, event);
504
505         } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
506             logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
507             return null;
508         }
509     }
510
511     /**
512      * Handles an event from the internal topic. This uses reflection to identify the
513      * appropriate process() method to invoke, based on the type of Message that was
514      * decoded.
515      *
516      * @param event the serialized {@link Message} read from the internal topic
517      */
518     private void handleInternal(String event) {
519         Class<?> clazz = null;
520
521         try {
522             Message msg = serializer.decodeMsg(event);
523
524             // get the class BEFORE checking the validity
525             clazz = msg.getClass();
526
527             msg.checkValidity();
528
529             Method meth = current.getClass().getMethod("process", msg.getClass());
530             changeState((State) meth.invoke(current, msg));
531
532         } catch (JsonParseException e) {
533             logger.warn("failed to decode message for topic {}", topic, e);
534
535         } catch (NoSuchMethodException | SecurityException e) {
536             logger.error("no processor for message {} for topic {}", clazz, topic, e);
537
538         } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
539                         | PoolingFeatureException e) {
540             logger.error("failed to process message {} for topic {}", clazz, topic, e);
541         }
542     }
543
544     @Override
545     public void startDistributing(BucketAssignments asgn) {
546         synchronized (curLocker) {
547             int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
548             logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
549             assignments = asgn;
550         }
551     }
552
553     @Override
554     public BucketAssignments getAssignments() {
555         return assignments;
556     }
557
558     @Override
559     public State goStart() {
560         return new StartState(this);
561     }
562
563     @Override
564     public State goQuery() {
565         return new QueryState(this);
566     }
567
568     @Override
569     public State goActive() {
570         activeLatch.countDown();
571         return new ActiveState(this);
572     }
573
574     @Override
575     public State goInactive() {
576         return new InactiveState(this);
577     }
578
579     /**
580      * Action to run a timer task. Only runs the task if the machine is still in the state
581      * that it was in when the timer was created.
582      */
583     private class TimerAction implements Runnable {
584
585         /**
586          * State of the machine when the timer was created.
587          */
588         private State origState;
589
590         /**
591          * Task to be executed.
592          */
593         private StateTimerTask task;
594
595         /**
596          * Constructor.
597          *
598          * @param task task to execute when this timer runs
599          */
600         public TimerAction(StateTimerTask task) {
601             this.origState = current;
602             this.task = task;
603         }
604
605         @Override
606         public void run() {
607             synchronized (curLocker) {
608                 if (current == origState) {
609                     changeState(task.fire());
610                 }
611             }
612         }
613     }
614
615     /**
616      * Creates a DMaaP manager.
617      *
618      * @param topic name of the internal DMaaP topic
619      * @return a new DMaaP manager
620      * @throws PoolingFeatureException if an error occurs
621      */
622     protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
623         return new DmaapManager(topic);
624     }
625
626     /**
627      * Creates a scheduled thread pool.
628      *
629      * @return a new scheduled thread pool
630      */
631     protected ScheduledThreadPoolExecutor makeScheduler() {
632         return new ScheduledThreadPoolExecutor(1);
633     }
634
635     /**
636      * Determines if the event can be decoded.
637      *
638      * @param drools drools controller
639      * @param topic topic on which the event was received
640      * @return {@code true} if the event can be decoded, {@code false} otherwise
641      */
642     protected boolean canDecodeEvent(DroolsController drools, String topic) {
643         return EventProtocolCoderConstants.getManager().isDecodingSupported(drools.getGroupId(), drools.getArtifactId(),
644                         topic);
645     }
646
647     /**
648      * Decodes the event.
649      *
650      * @param drools drools controller
651      * @param topic topic on which the event was received
652      * @param event event text to be decoded
653      * @return the decoded event
654      * @throws IllegalArgumentException illegal argument
655      * @throw UnsupportedOperationException unsupported operation
656      * @throws IllegalStateException illegal state
657      */
658     protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
659         return EventProtocolCoderConstants.getManager().decode(drools.getGroupId(), drools.getArtifactId(), topic,
660                         event);
661     }
662 }