Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-messages / src / main / java / org / onap / policy / drools / pooling / PoolingManagerImpl.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling;
23
24 import com.google.gson.JsonParseException;
25 import java.lang.reflect.InvocationTargetException;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import lombok.Getter;
31 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
32 import org.onap.policy.common.endpoints.event.comm.TopicListener;
33 import org.onap.policy.drools.controller.DroolsController;
34 import org.onap.policy.drools.pooling.message.BucketAssignments;
35 import org.onap.policy.drools.pooling.message.Leader;
36 import org.onap.policy.drools.pooling.message.Message;
37 import org.onap.policy.drools.pooling.message.Offline;
38 import org.onap.policy.drools.pooling.state.ActiveState;
39 import org.onap.policy.drools.pooling.state.IdleState;
40 import org.onap.policy.drools.pooling.state.InactiveState;
41 import org.onap.policy.drools.pooling.state.QueryState;
42 import org.onap.policy.drools.pooling.state.StartState;
43 import org.onap.policy.drools.pooling.state.State;
44 import org.onap.policy.drools.pooling.state.StateTimerTask;
45 import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
46 import org.onap.policy.drools.system.PolicyController;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
52  * events coming from external topics are saved in a queue for later processing. Once
53  * assignments are made, the saved events are processed. In addition, while the controller
54  * is locked, events are still forwarded to other hosts and bucket assignments are still
55  * updated, based on any {@link Leader} messages that it receives.
56  */
57 public class PoolingManagerImpl implements PoolingManager, TopicListener {
58
59     private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
60
61     /**
62      * Maximum number of times a message can be forwarded.
63      */
64     public static final int MAX_HOPS = 5;
65
66     /**
67      * ID of this host.
68      */
69     @Getter
70     private final String host;
71
72     /**
73      * Properties with which this was configured.
74      */
75     @Getter
76     private final PoolingProperties properties;
77
78     /**
79      * Associated controller.
80      */
81     private final PolicyController controller;
82
83     /**
84      * Decremented each time the manager enters the Active state. Used by junit tests.
85      */
86     private final CountDownLatch activeLatch;
87
88     /**
89      * Used to encode & decode request objects received from & sent to a rule engine.
90      */
91     private final Serializer serializer;
92
93     /**
94      * Internal DMaaP topic used by this controller.
95      */
96     @Getter
97     private final String topic;
98
99     /**
100      * Manager for the internal DMaaP topic.
101      */
102     private final TopicMessageManager topicMessageManager;
103
104     /**
105      * Lock used while updating {@link #current}. In general, public methods must use
106      * this, while private methods assume the lock is already held.
107      */
108     private final Object curLocker = new Object();
109
110     /**
111      * Current state.
112      *
113      * <p>This uses a finite state machine, wherein the state object contains all of the data
114      * relevant to that state. Each state object has a process() method, specific to each
115      * type of {@link Message} subclass. The method returns the next state object, or
116      * {@code null} if the state is to remain the same.
117      */
118     private State current;
119
120     /**
121      * Current bucket assignments or {@code null}.
122      */
123     @Getter
124     private BucketAssignments assignments = null;
125
126     /**
127      * Pool used to execute timers.
128      */
129     private ScheduledThreadPoolExecutor scheduler = null;
130
131     /**
132      * Constructs the manager, initializing all the data structures.
133      *
134      * @param host name/uuid of this host
135      * @param controller controller with which this is associated
136      * @param props feature properties specific to the controller
137      * @param activeLatch latch to be decremented each time the manager enters the Active
138      *        state
139      */
140     public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
141                     CountDownLatch activeLatch) {
142         this.host = host;
143         this.controller = controller;
144         this.properties = props;
145         this.activeLatch = activeLatch;
146
147         try {
148             this.serializer = new Serializer();
149             this.topic = props.getPoolingTopic();
150             this.topicMessageManager = makeTopicMessagesManager(props.getPoolingTopic());
151             this.current = new IdleState(this);
152
153             logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
154
155         } catch (ClassCastException e) {
156             logger.error("not a topic listener, controller {}", controller.getName());
157             throw new PoolingFeatureRtException(e);
158
159         } catch (PoolingFeatureException e) {
160             logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
161             throw new PoolingFeatureRtException(e);
162         }
163     }
164
165     /**
166      * Should only be used by junit tests.
167      *
168      * @return the current state
169      */
170     protected State getCurrent() {
171         synchronized (curLocker) {
172             return current;
173         }
174     }
175
176     /**
177      * Indicates that the controller is about to start. Starts the publisher for the
178      * internal topic, and creates a thread pool for the timers.
179      */
180     public void beforeStart() {
181         synchronized (curLocker) {
182             if (scheduler == null) {
183                 topicMessageManager.startPublisher();
184
185                 logger.debug("make scheduler thread for topic {}", getTopic());
186                 scheduler = makeScheduler();
187
188                 /*
189                  * Only a handful of timers at any moment, thus we can afford to take the
190                  * time to remove them when they're cancelled.
191                  */
192                 scheduler.setRemoveOnCancelPolicy(true);
193                 scheduler.setMaximumPoolSize(1);
194                 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
195                 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
196             }
197         }
198     }
199
200     /**
201      * Indicates that the controller has successfully started. Starts the consumer for the
202      * internal topic, enters the {@link StartState}, and sets the filter for the initial
203      * state.
204      */
205     public void afterStart() {
206         synchronized (curLocker) {
207             if (current instanceof IdleState) {
208                 topicMessageManager.startConsumer(this);
209                 changeState(new StartState(this));
210             }
211         }
212     }
213
214     /**
215      * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
216      * and the current state.
217      */
218     public void beforeStop() {
219         ScheduledThreadPoolExecutor sched;
220
221         synchronized (curLocker) {
222             sched = scheduler;
223             scheduler = null;
224
225             if (!(current instanceof IdleState)) {
226                 changeState(new IdleState(this));
227                 topicMessageManager.stopConsumer(this);
228                 publishAdmin(new Offline(getHost()));
229             }
230
231             assignments = null;
232         }
233
234         if (sched != null) {
235             logger.debug("stop scheduler for topic {}", getTopic());
236             sched.shutdownNow();
237         }
238     }
239
240     /**
241      * Indicates that the controller has stopped. Stops the publisher and logs a warning
242      * if any events are still in the queue.
243      */
244     public void afterStop() {
245         synchronized (curLocker) {
246             /*
247              * stop the publisher, but allow time for any Offline message to be
248              * transmitted
249              */
250             topicMessageManager.stopPublisher(properties.getOfflinePubWaitMs());
251         }
252     }
253
254     /**
255      * Indicates that the controller is about to be locked. Enters the idle state, as all
256      * it will be doing is forwarding messages.
257      */
258     public void beforeLock() {
259         logger.info("locking manager for topic {}", getTopic());
260
261         synchronized (curLocker) {
262             changeState(new IdleState(this));
263         }
264     }
265
266     /**
267      * Indicates that the controller has been unlocked. Enters the start state, if the
268      * controller is running.
269      */
270     public void afterUnlock() {
271         logger.info("unlocking manager for topic {}", getTopic());
272
273         synchronized (curLocker) {
274             if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
275                 changeState(new StartState(this));
276             }
277         }
278     }
279
280     /**
281      * Changes the finite state machine to a new state, provided the new state is not
282      * {@code null}.
283      *
284      * @param newState new state, or {@code null} if to remain unchanged
285      */
286     private void changeState(State newState) {
287         if (newState != null) {
288             current.cancelTimers();
289             current = newState;
290
291             newState.start();
292         }
293     }
294
295     @Override
296     public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
297         // wrap the task in a TimerAction and schedule it
298         ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
299
300         // wrap the future in a "CancellableScheduledTask"
301         return () -> fut.cancel(false);
302     }
303
304     @Override
305     public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
306         // wrap the task in a TimerAction and schedule it
307         ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
308                         TimeUnit.MILLISECONDS);
309
310         // wrap the future in a "CancellableScheduledTask"
311         return () -> fut.cancel(false);
312     }
313
314     @Override
315     public void publishAdmin(Message msg) {
316         publish(Message.ADMIN, msg);
317     }
318
319     @Override
320     public void publish(String channel, Message msg) {
321         logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
322
323         msg.setChannel(channel);
324
325         try {
326             // ensure it's valid before we send it
327             msg.checkValidity();
328
329             String txt = serializer.encodeMsg(msg);
330             topicMessageManager.publish(txt);
331
332         } catch (JsonParseException e) {
333             logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
334
335         } catch (PoolingFeatureException e) {
336             logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
337         }
338     }
339
340     /**
341      * Handles an event from the internal topic.
342      *
343      * @param commType comm infrastructure
344      * @param topic2 topic
345      * @param event event
346      */
347     @Override
348     public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
349
350         if (event == null) {
351             logger.error("null event on topic {}", topic);
352             return;
353         }
354
355         synchronized (curLocker) {
356             // it's on the internal topic
357             handleInternal(event);
358         }
359     }
360
361     /**
362      * Called by the PolicyController before it offers the event to the DroolsController.
363      * If the controller is locked, then it isn't processing events. However, they still
364      * need to be forwarded, thus in that case, they are decoded and forwarded.
365      *
366      * <p>On the other hand, if the controller is not locked, then we just return immediately
367      * and let {@link #beforeInsert(String, Object)  beforeInsert()} handle
368      * it instead, as it already has the decoded message.
369      *
370      * @param topic2 topic
371      * @param event event
372      * @return {@code true} if the event was handled by the manager, {@code false} if it
373      *         must still be handled by the invoker
374      */
375     public boolean beforeOffer(String topic2, String event) {
376
377         if (!controller.isLocked()) {
378             // we should NOT intercept this message - let the invoker handle it
379             return false;
380         }
381
382         return handleExternal(topic2, decodeEvent(topic2, event));
383     }
384
385     /**
386      * Called by the DroolsController before it inserts the event into the rule engine.
387      *
388      * @param topic2 topic
389      * @param event event, as an object
390      * @return {@code true} if the event was handled by the manager, {@code false} if it
391      *         must still be handled by the invoker
392      */
393     public boolean beforeInsert(String topic2, Object event) {
394         return handleExternal(topic2, event);
395     }
396
397     /**
398      * Handles an event from an external topic.
399      *
400      * @param topic2 topic
401      * @param event event, as an object, or {@code null} if it cannot be decoded
402      * @return {@code true} if the event was handled by the manager, {@code false} if it
403      *         must still be handled by the invoker
404      */
405     private boolean handleExternal(String topic2, Object event) {
406         if (event == null) {
407             // no event - let the invoker handle it
408             return false;
409         }
410
411         synchronized (curLocker) {
412             return handleExternal(topic2, event, event.hashCode());
413         }
414     }
415
416     /**
417      * Handles an event from an external topic.
418      *
419      * @param topic2 topic
420      * @param event event, as an object
421      * @param eventHashCode event's hash code
422      * @return {@code true} if the event was handled, {@code false} if the invoker should
423      *         handle it
424      */
425     private boolean handleExternal(String topic2, Object event, int eventHashCode) {
426         if (assignments == null) {
427             // no bucket assignments yet - handle locally
428             logger.info("handle event locally for request {}", event);
429
430             // we did NOT consume the event
431             return false;
432
433         } else {
434             return handleEvent(topic2, event, eventHashCode);
435         }
436     }
437
438     /**
439      * Handles a {@link Forward} event, possibly forwarding it again.
440      *
441      * @param topic2 topic
442      * @param event event, as an object
443      * @param eventHashCode event's hash code
444      * @return {@code true} if the event was handled, {@code false} if the invoker should
445      *         handle it
446      */
447     private boolean handleEvent(String topic2, Object event, int eventHashCode) {
448         String target = assignments.getAssignedHost(eventHashCode);
449
450         if (target == null) {
451             /*
452              * This bucket has no assignment - just discard the event
453              */
454             logger.warn("discarded event for unassigned bucket from topic {}", topic2);
455             return true;
456         }
457
458         if (target.equals(host)) {
459             /*
460              * Message belongs to this host - allow the controller to handle it.
461              */
462             logger.info("handle local event for request {} from topic {}", event, topic2);
463             return false;
464         }
465
466         // not our message, consume the event
467         logger.warn("discarded event for host {} from topic {}", target, topic2);
468         return true;
469     }
470
471     /**
472      * Decodes an event from a String into an event Object.
473      *
474      * @param topic2 topic
475      * @param event event
476      * @return the decoded event object, or {@code null} if it can't be decoded
477      */
478     private Object decodeEvent(String topic2, String event) {
479         DroolsController drools = controller.getDrools();
480
481         // check if this topic has a decoder
482
483         if (!canDecodeEvent(drools, topic2)) {
484
485             logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
486                             drools.getArtifactId());
487             return null;
488         }
489
490         // decode
491
492         try {
493             return decodeEventWrapper(drools, topic2, event);
494
495         } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
496             logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
497             return null;
498         }
499     }
500
501     /**
502      * Handles an event from the internal topic. This uses reflection to identify the
503      * appropriate process() method to invoke, based on the type of Message that was
504      * decoded.
505      *
506      * @param event the serialized {@link Message} read from the internal topic
507      */
508     private void handleInternal(String event) {
509         Class<?> clazz = null;
510
511         try {
512             Message msg = serializer.decodeMsg(event);
513
514             // get the class BEFORE checking the validity
515             clazz = msg.getClass();
516
517             msg.checkValidity();
518
519             var meth = current.getClass().getMethod("process", msg.getClass());
520             changeState((State) meth.invoke(current, msg));
521
522         } catch (JsonParseException e) {
523             logger.warn("failed to decode message for topic {}", topic, e);
524
525         } catch (NoSuchMethodException | SecurityException e) {
526             logger.error("no processor for message {} for topic {}", clazz, topic, e);
527
528         } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
529                         | PoolingFeatureException e) {
530             logger.error("failed to process message {} for topic {}", clazz, topic, e);
531         }
532     }
533
534     @Override
535     public void startDistributing(BucketAssignments asgn) {
536         synchronized (curLocker) {
537             int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
538             logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
539             assignments = asgn;
540         }
541     }
542
543     @Override
544     public State goStart() {
545         return new StartState(this);
546     }
547
548     @Override
549     public State goQuery() {
550         return new QueryState(this);
551     }
552
553     @Override
554     public State goActive() {
555         activeLatch.countDown();
556         return new ActiveState(this);
557     }
558
559     @Override
560     public State goInactive() {
561         return new InactiveState(this);
562     }
563
564     /**
565      * Action to run a timer task. Only runs the task if the machine is still in the state
566      * that it was in when the timer was created.
567      */
568     private class TimerAction implements Runnable {
569
570         /**
571          * State of the machine when the timer was created.
572          */
573         private State origState;
574
575         /**
576          * Task to be executed.
577          */
578         private StateTimerTask task;
579
580         /**
581          * Constructor.
582          *
583          * @param task task to execute when this timer runs
584          */
585         public TimerAction(StateTimerTask task) {
586             this.origState = current;
587             this.task = task;
588         }
589
590         @Override
591         public void run() {
592             synchronized (curLocker) {
593                 if (current == origState) {
594                     changeState(task.fire());
595                 }
596             }
597         }
598     }
599
600     /**
601      * Creates a DMaaP manager.
602      *
603      * @param topic name of the internal DMaaP topic
604      * @return a new topic messages manager
605      * @throws PoolingFeatureException if an error occurs
606      */
607     protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
608         return new TopicMessageManager(topic);
609     }
610
611     /**
612      * Creates a scheduled thread pool.
613      *
614      * @return a new scheduled thread pool
615      */
616     protected ScheduledThreadPoolExecutor makeScheduler() {
617         return new ScheduledThreadPoolExecutor(1);
618     }
619
620     /**
621      * Determines if the event can be decoded.
622      *
623      * @param drools drools controller
624      * @param topic topic on which the event was received
625      * @return {@code true} if the event can be decoded, {@code false} otherwise
626      */
627     protected boolean canDecodeEvent(DroolsController drools, String topic) {
628         return EventProtocolCoderConstants.getManager().isDecodingSupported(drools.getGroupId(), drools.getArtifactId(),
629                         topic);
630     }
631
632     /**
633      * Decodes the event.
634      *
635      * @param drools drools controller
636      * @param topic topic on which the event was received
637      * @param event event text to be decoded
638      * @return the decoded event
639      * @throws IllegalArgumentException illegal argument
640      * @throws UnsupportedOperationException unsupported operation
641      * @throws IllegalStateException illegal state
642      */
643     protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
644         return EventProtocolCoderConstants.getManager().decode(drools.getGroupId(), drools.getArtifactId(), topic,
645                         event);
646     }
647 }