Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / PoolingManagerImpl.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import com.google.gson.JsonParseException;
24 import java.lang.reflect.InvocationTargetException;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import lombok.Getter;
30 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
31 import org.onap.policy.common.endpoints.event.comm.TopicListener;
32 import org.onap.policy.drools.controller.DroolsController;
33 import org.onap.policy.drools.pooling.message.BucketAssignments;
34 import org.onap.policy.drools.pooling.message.Leader;
35 import org.onap.policy.drools.pooling.message.Message;
36 import org.onap.policy.drools.pooling.message.Offline;
37 import org.onap.policy.drools.pooling.state.ActiveState;
38 import org.onap.policy.drools.pooling.state.IdleState;
39 import org.onap.policy.drools.pooling.state.InactiveState;
40 import org.onap.policy.drools.pooling.state.QueryState;
41 import org.onap.policy.drools.pooling.state.StartState;
42 import org.onap.policy.drools.pooling.state.State;
43 import org.onap.policy.drools.pooling.state.StateTimerTask;
44 import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
45 import org.onap.policy.drools.system.PolicyController;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
51  * events coming from external topics are saved in a queue for later processing. Once
52  * assignments are made, the saved events are processed. In addition, while the controller
53  * is locked, events are still forwarded to other hosts and bucket assignments are still
54  * updated, based on any {@link Leader} messages that it receives.
55  */
56 public class PoolingManagerImpl implements PoolingManager, TopicListener {
57
58     private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
59
60     /**
61      * Maximum number of times a message can be forwarded.
62      */
63     public static final int MAX_HOPS = 5;
64
65     /**
66      * ID of this host.
67      */
68     @Getter
69     private final String host;
70
71     /**
72      * Properties with which this was configured.
73      */
74     @Getter
75     private final PoolingProperties properties;
76
77     /**
78      * Associated controller.
79      */
80     private final PolicyController controller;
81
82     /**
83      * Decremented each time the manager enters the Active state. Used by junit tests.
84      */
85     private final CountDownLatch activeLatch;
86
87     /**
88      * Used to encode & decode request objects received from & sent to a rule engine.
89      */
90     private final Serializer serializer;
91
92     /**
93      * Internal DMaaP topic used by this controller.
94      */
95     @Getter
96     private final String topic;
97
98     /**
99      * Manager for the internal DMaaP topic.
100      */
101     private final DmaapManager dmaapMgr;
102
103     /**
104      * Lock used while updating {@link #current}. In general, public methods must use
105      * this, while private methods assume the lock is already held.
106      */
107     private final Object curLocker = new Object();
108
109     /**
110      * Current state.
111      *
112      * <p>This uses a finite state machine, wherein the state object contains all of the data
113      * relevant to that state. Each state object has a process() method, specific to each
114      * type of {@link Message} subclass. The method returns the next state object, or
115      * {@code null} if the state is to remain the same.
116      */
117     private State current;
118
119     /**
120      * Current bucket assignments or {@code null}.
121      */
122     @Getter
123     private BucketAssignments assignments = null;
124
125     /**
126      * Pool used to execute timers.
127      */
128     private ScheduledThreadPoolExecutor scheduler = null;
129
130     /**
131      * Constructs the manager, initializing all of the data structures.
132      *
133      * @param host name/uuid of this host
134      * @param controller controller with which this is associated
135      * @param props feature properties specific to the controller
136      * @param activeLatch latch to be decremented each time the manager enters the Active
137      *        state
138      */
139     public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
140                     CountDownLatch activeLatch) {
141         this.host = host;
142         this.controller = controller;
143         this.properties = props;
144         this.activeLatch = activeLatch;
145
146         try {
147             this.serializer = new Serializer();
148             this.topic = props.getPoolingTopic();
149             this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
150             this.current = new IdleState(this);
151
152             logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
153
154         } catch (ClassCastException e) {
155             logger.error("not a topic listener, controller {}", controller.getName());
156             throw new PoolingFeatureRtException(e);
157
158         } catch (PoolingFeatureException e) {
159             logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
160             throw new PoolingFeatureRtException(e);
161         }
162     }
163
164     /**
165      * Should only be used by junit tests.
166      *
167      * @return the current state
168      */
169     protected State getCurrent() {
170         synchronized (curLocker) {
171             return current;
172         }
173     }
174
175     /**
176      * Indicates that the controller is about to start. Starts the publisher for the
177      * internal topic, and creates a thread pool for the timers.
178      */
179     public void beforeStart() {
180         synchronized (curLocker) {
181             if (scheduler == null) {
182                 dmaapMgr.startPublisher();
183
184                 logger.debug("make scheduler thread for topic {}", getTopic());
185                 scheduler = makeScheduler();
186
187                 /*
188                  * Only a handful of timers at any moment, thus we can afford to take the
189                  * time to remove them when they're cancelled.
190                  */
191                 scheduler.setRemoveOnCancelPolicy(true);
192                 scheduler.setMaximumPoolSize(1);
193                 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
194                 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
195             }
196         }
197     }
198
199     /**
200      * Indicates that the controller has successfully started. Starts the consumer for the
201      * internal topic, enters the {@link StartState}, and sets the filter for the initial
202      * state.
203      */
204     public void afterStart() {
205         synchronized (curLocker) {
206             if (current instanceof IdleState) {
207                 dmaapMgr.startConsumer(this);
208                 changeState(new StartState(this));
209             }
210         }
211     }
212
213     /**
214      * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
215      * and the current state.
216      */
217     public void beforeStop() {
218         ScheduledThreadPoolExecutor sched;
219
220         synchronized (curLocker) {
221             sched = scheduler;
222             scheduler = null;
223
224             if (!(current instanceof IdleState)) {
225                 changeState(new IdleState(this));
226                 dmaapMgr.stopConsumer(this);
227                 publishAdmin(new Offline(getHost()));
228             }
229
230             assignments = null;
231         }
232
233         if (sched != null) {
234             logger.debug("stop scheduler for topic {}", getTopic());
235             sched.shutdownNow();
236         }
237     }
238
239     /**
240      * Indicates that the controller has stopped. Stops the publisher and logs a warning
241      * if any events are still in the queue.
242      */
243     public void afterStop() {
244         synchronized (curLocker) {
245             /*
246              * stop the publisher, but allow time for any Offline message to be
247              * transmitted
248              */
249             dmaapMgr.stopPublisher(properties.getOfflinePubWaitMs());
250         }
251     }
252
253     /**
254      * Indicates that the controller is about to be locked. Enters the idle state, as all
255      * it will be doing is forwarding messages.
256      */
257     public void beforeLock() {
258         logger.info("locking manager for topic {}", getTopic());
259
260         synchronized (curLocker) {
261             changeState(new IdleState(this));
262         }
263     }
264
265     /**
266      * Indicates that the controller has been unlocked. Enters the start state, if the
267      * controller is running.
268      */
269     public void afterUnlock() {
270         logger.info("unlocking manager for topic {}", getTopic());
271
272         synchronized (curLocker) {
273             if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
274                 changeState(new StartState(this));
275             }
276         }
277     }
278
279     /**
280      * Changes the finite state machine to a new state, provided the new state is not
281      * {@code null}.
282      *
283      * @param newState new state, or {@code null} if to remain unchanged
284      */
285     private void changeState(State newState) {
286         if (newState != null) {
287             current.cancelTimers();
288             current = newState;
289
290             newState.start();
291         }
292     }
293
294     @Override
295     public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
296         // wrap the task in a TimerAction and schedule it
297         ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
298
299         // wrap the future in a "CancellableScheduledTask"
300         return () -> fut.cancel(false);
301     }
302
303     @Override
304     public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
305         // wrap the task in a TimerAction and schedule it
306         ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
307                         TimeUnit.MILLISECONDS);
308
309         // wrap the future in a "CancellableScheduledTask"
310         return () -> fut.cancel(false);
311     }
312
313     @Override
314     public void publishAdmin(Message msg) {
315         publish(Message.ADMIN, msg);
316     }
317
318     @Override
319     public void publish(String channel, Message msg) {
320         logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
321
322         msg.setChannel(channel);
323
324         try {
325             // ensure it's valid before we send it
326             msg.checkValidity();
327
328             String txt = serializer.encodeMsg(msg);
329             dmaapMgr.publish(txt);
330
331         } catch (JsonParseException e) {
332             logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
333
334         } catch (PoolingFeatureException e) {
335             logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
336         }
337     }
338
339     /**
340      * Handles an event from the internal topic.
341      *
342      * @param commType comm infrastructure
343      * @param topic2 topic
344      * @param event event
345      */
346     @Override
347     public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
348
349         if (event == null) {
350             logger.error("null event on topic {}", topic);
351             return;
352         }
353
354         synchronized (curLocker) {
355             // it's on the internal topic
356             handleInternal(event);
357         }
358     }
359
360     /**
361      * Called by the PolicyController before it offers the event to the DroolsController.
362      * If the controller is locked, then it isn't processing events. However, they still
363      * need to be forwarded, thus in that case, they are decoded and forwarded.
364      *
365      * <p>On the other hand, if the controller is not locked, then we just return immediately
366      * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
367      * it instead, as it already has the decoded message.
368      *
369      * @param topic2 topic
370      * @param event event
371      * @return {@code true} if the event was handled by the manager, {@code false} if it
372      *         must still be handled by the invoker
373      */
374     public boolean beforeOffer(String topic2, String event) {
375
376         if (!controller.isLocked()) {
377             // we should NOT intercept this message - let the invoker handle it
378             return false;
379         }
380
381         return handleExternal(topic2, decodeEvent(topic2, event));
382     }
383
384     /**
385      * Called by the DroolsController before it inserts the event into the rule engine.
386      *
387      * @param topic2 topic
388      * @param event event, as an object
389      * @return {@code true} if the event was handled by the manager, {@code false} if it
390      *         must still be handled by the invoker
391      */
392     public boolean beforeInsert(String topic2, Object event) {
393         return handleExternal(topic2, event);
394     }
395
396     /**
397      * Handles an event from an external topic.
398      *
399      * @param topic2 topic
400      * @param event event, as an object, or {@code null} if it cannot be decoded
401      * @return {@code true} if the event was handled by the manager, {@code false} if it
402      *         must still be handled by the invoker
403      */
404     private boolean handleExternal(String topic2, Object event) {
405         if (event == null) {
406             // no event - let the invoker handle it
407             return false;
408         }
409
410         synchronized (curLocker) {
411             return handleExternal(topic2, event, event.hashCode());
412         }
413     }
414
415     /**
416      * Handles an event from an external topic.
417      *
418      * @param topic2 topic
419      * @param event event, as an object
420      * @param eventHashCode event's hash code
421      * @return {@code true} if the event was handled, {@code false} if the invoker should
422      *         handle it
423      */
424     private boolean handleExternal(String topic2, Object event, int eventHashCode) {
425         if (assignments == null) {
426             // no bucket assignments yet - handle locally
427             logger.info("handle event locally for request {}", event);
428
429             // we did NOT consume the event
430             return false;
431
432         } else {
433             return handleEvent(topic2, event, eventHashCode);
434         }
435     }
436
437     /**
438      * Handles a {@link Forward} event, possibly forwarding it again.
439      *
440      * @param topic2 topic
441      * @param event event, as an object
442      * @param eventHashCode event's hash code
443      * @return {@code true} if the event was handled, {@code false} if the invoker should
444      *         handle it
445      */
446     private boolean handleEvent(String topic2, Object event, int eventHashCode) {
447         String target = assignments.getAssignedHost(eventHashCode);
448
449         if (target == null) {
450             /*
451              * This bucket has no assignment - just discard the event
452              */
453             logger.warn("discarded event for unassigned bucket from topic {}", topic2);
454             return true;
455         }
456
457         if (target.equals(host)) {
458             /*
459              * Message belongs to this host - allow the controller to handle it.
460              */
461             logger.info("handle local event for request {} from topic {}", event, topic2);
462             return false;
463         }
464
465         // not our message, consume the event
466         logger.warn("discarded event for host {} from topic {}", target, topic2);
467         return true;
468     }
469
470     /**
471      * Decodes an event from a String into an event Object.
472      *
473      * @param topic2 topic
474      * @param event event
475      * @return the decoded event object, or {@code null} if it can't be decoded
476      */
477     private Object decodeEvent(String topic2, String event) {
478         DroolsController drools = controller.getDrools();
479
480         // check if this topic has a decoder
481
482         if (!canDecodeEvent(drools, topic2)) {
483
484             logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
485                             drools.getArtifactId());
486             return null;
487         }
488
489         // decode
490
491         try {
492             return decodeEventWrapper(drools, topic2, event);
493
494         } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
495             logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
496             return null;
497         }
498     }
499
500     /**
501      * Handles an event from the internal topic. This uses reflection to identify the
502      * appropriate process() method to invoke, based on the type of Message that was
503      * decoded.
504      *
505      * @param event the serialized {@link Message} read from the internal topic
506      */
507     private void handleInternal(String event) {
508         Class<?> clazz = null;
509
510         try {
511             Message msg = serializer.decodeMsg(event);
512
513             // get the class BEFORE checking the validity
514             clazz = msg.getClass();
515
516             msg.checkValidity();
517
518             var meth = current.getClass().getMethod("process", msg.getClass());
519             changeState((State) meth.invoke(current, msg));
520
521         } catch (JsonParseException e) {
522             logger.warn("failed to decode message for topic {}", topic, e);
523
524         } catch (NoSuchMethodException | SecurityException e) {
525             logger.error("no processor for message {} for topic {}", clazz, topic, e);
526
527         } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
528                         | PoolingFeatureException e) {
529             logger.error("failed to process message {} for topic {}", clazz, topic, e);
530         }
531     }
532
533     @Override
534     public void startDistributing(BucketAssignments asgn) {
535         synchronized (curLocker) {
536             int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
537             logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
538             assignments = asgn;
539         }
540     }
541
542     @Override
543     public State goStart() {
544         return new StartState(this);
545     }
546
547     @Override
548     public State goQuery() {
549         return new QueryState(this);
550     }
551
552     @Override
553     public State goActive() {
554         activeLatch.countDown();
555         return new ActiveState(this);
556     }
557
558     @Override
559     public State goInactive() {
560         return new InactiveState(this);
561     }
562
563     /**
564      * Action to run a timer task. Only runs the task if the machine is still in the state
565      * that it was in when the timer was created.
566      */
567     private class TimerAction implements Runnable {
568
569         /**
570          * State of the machine when the timer was created.
571          */
572         private State origState;
573
574         /**
575          * Task to be executed.
576          */
577         private StateTimerTask task;
578
579         /**
580          * Constructor.
581          *
582          * @param task task to execute when this timer runs
583          */
584         public TimerAction(StateTimerTask task) {
585             this.origState = current;
586             this.task = task;
587         }
588
589         @Override
590         public void run() {
591             synchronized (curLocker) {
592                 if (current == origState) {
593                     changeState(task.fire());
594                 }
595             }
596         }
597     }
598
599     /**
600      * Creates a DMaaP manager.
601      *
602      * @param topic name of the internal DMaaP topic
603      * @return a new DMaaP manager
604      * @throws PoolingFeatureException if an error occurs
605      */
606     protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
607         return new DmaapManager(topic);
608     }
609
610     /**
611      * Creates a scheduled thread pool.
612      *
613      * @return a new scheduled thread pool
614      */
615     protected ScheduledThreadPoolExecutor makeScheduler() {
616         return new ScheduledThreadPoolExecutor(1);
617     }
618
619     /**
620      * Determines if the event can be decoded.
621      *
622      * @param drools drools controller
623      * @param topic topic on which the event was received
624      * @return {@code true} if the event can be decoded, {@code false} otherwise
625      */
626     protected boolean canDecodeEvent(DroolsController drools, String topic) {
627         return EventProtocolCoderConstants.getManager().isDecodingSupported(drools.getGroupId(), drools.getArtifactId(),
628                         topic);
629     }
630
631     /**
632      * Decodes the event.
633      *
634      * @param drools drools controller
635      * @param topic topic on which the event was received
636      * @param event event text to be decoded
637      * @return the decoded event
638      * @throws IllegalArgumentException illegal argument
639      * @throw UnsupportedOperationException unsupported operation
640      * @throws IllegalStateException illegal state
641      */
642     protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
643         return EventProtocolCoderConstants.getManager().decode(drools.getGroupId(), drools.getArtifactId(), topic,
644                         event);
645     }
646 }