2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling;
24 import com.google.gson.JsonParseException;
25 import java.lang.reflect.InvocationTargetException;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
31 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
32 import org.onap.policy.common.endpoints.event.comm.TopicListener;
33 import org.onap.policy.drools.controller.DroolsController;
34 import org.onap.policy.drools.pooling.message.BucketAssignments;
35 import org.onap.policy.drools.pooling.message.Leader;
36 import org.onap.policy.drools.pooling.message.Message;
37 import org.onap.policy.drools.pooling.message.Offline;
38 import org.onap.policy.drools.pooling.state.ActiveState;
39 import org.onap.policy.drools.pooling.state.IdleState;
40 import org.onap.policy.drools.pooling.state.InactiveState;
41 import org.onap.policy.drools.pooling.state.QueryState;
42 import org.onap.policy.drools.pooling.state.StartState;
43 import org.onap.policy.drools.pooling.state.State;
44 import org.onap.policy.drools.pooling.state.StateTimerTask;
45 import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
46 import org.onap.policy.drools.system.PolicyController;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
51 * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
52 * events coming from external topics are saved in a queue for later processing. Once
53 * assignments are made, the saved events are processed. In addition, while the controller
54 * is locked, events are still forwarded to other hosts and bucket assignments are still
55 * updated, based on any {@link Leader} messages that it receives.
57 public class PoolingManagerImpl implements PoolingManager, TopicListener {
59 private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
62 * Maximum number of times a message can be forwarded.
64 public static final int MAX_HOPS = 5;
70 private final String host;
73 * Properties with which this was configured.
76 private final PoolingProperties properties;
79 * Associated controller.
81 private final PolicyController controller;
84 * Decremented each time the manager enters the Active state. Used by junit tests.
86 private final CountDownLatch activeLatch;
89 * Used to encode & decode request objects received from & sent to a rule engine.
91 private final Serializer serializer;
94 * Internal DMaaP topic used by this controller.
97 private final String topic;
100 * Manager for the internal DMaaP topic.
102 private final TopicMessageManager topicMessageManager;
105 * Lock used while updating {@link #current}. In general, public methods must use
106 * this, while private methods assume the lock is already held.
108 private final Object curLocker = new Object();
113 * <p>This uses a finite state machine, wherein the state object contains all of the data
114 * relevant to that state. Each state object has a process() method, specific to each
115 * type of {@link Message} subclass. The method returns the next state object, or
116 * {@code null} if the state is to remain the same.
118 private State current;
121 * Current bucket assignments or {@code null}.
124 private BucketAssignments assignments = null;
127 * Pool used to execute timers.
129 private ScheduledThreadPoolExecutor scheduler = null;
132 * Constructs the manager, initializing all the data structures.
134 * @param host name/uuid of this host
135 * @param controller controller with which this is associated
136 * @param props feature properties specific to the controller
137 * @param activeLatch latch to be decremented each time the manager enters the Active
140 public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
141 CountDownLatch activeLatch) {
143 this.controller = controller;
144 this.properties = props;
145 this.activeLatch = activeLatch;
148 this.serializer = new Serializer();
149 this.topic = props.getPoolingTopic();
150 this.topicMessageManager = makeTopicMessagesManager(props.getPoolingTopic());
151 this.current = new IdleState(this);
153 logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
155 } catch (ClassCastException e) {
156 logger.error("not a topic listener, controller {}", controller.getName());
157 throw new PoolingFeatureRtException(e);
159 } catch (PoolingFeatureException e) {
160 logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
161 throw new PoolingFeatureRtException(e);
166 * Should only be used by junit tests.
168 * @return the current state
170 protected State getCurrent() {
171 synchronized (curLocker) {
177 * Indicates that the controller is about to start. Starts the publisher for the
178 * internal topic, and creates a thread pool for the timers.
180 public void beforeStart() {
181 synchronized (curLocker) {
182 if (scheduler == null) {
183 topicMessageManager.startPublisher();
185 logger.debug("make scheduler thread for topic {}", getTopic());
186 scheduler = makeScheduler();
189 * Only a handful of timers at any moment, thus we can afford to take the
190 * time to remove them when they're cancelled.
192 scheduler.setRemoveOnCancelPolicy(true);
193 scheduler.setMaximumPoolSize(1);
194 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
195 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
201 * Indicates that the controller has successfully started. Starts the consumer for the
202 * internal topic, enters the {@link StartState}, and sets the filter for the initial
205 public void afterStart() {
206 synchronized (curLocker) {
207 if (current instanceof IdleState) {
208 topicMessageManager.startConsumer(this);
209 changeState(new StartState(this));
215 * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
216 * and the current state.
218 public void beforeStop() {
219 ScheduledThreadPoolExecutor sched;
221 synchronized (curLocker) {
225 if (!(current instanceof IdleState)) {
226 changeState(new IdleState(this));
227 topicMessageManager.stopConsumer(this);
228 publishAdmin(new Offline(getHost()));
235 logger.debug("stop scheduler for topic {}", getTopic());
241 * Indicates that the controller has stopped. Stops the publisher and logs a warning
242 * if any events are still in the queue.
244 public void afterStop() {
245 synchronized (curLocker) {
247 * stop the publisher, but allow time for any Offline message to be
250 topicMessageManager.stopPublisher(properties.getOfflinePubWaitMs());
255 * Indicates that the controller is about to be locked. Enters the idle state, as all
256 * it will be doing is forwarding messages.
258 public void beforeLock() {
259 logger.info("locking manager for topic {}", getTopic());
261 synchronized (curLocker) {
262 changeState(new IdleState(this));
267 * Indicates that the controller has been unlocked. Enters the start state, if the
268 * controller is running.
270 public void afterUnlock() {
271 logger.info("unlocking manager for topic {}", getTopic());
273 synchronized (curLocker) {
274 if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
275 changeState(new StartState(this));
281 * Changes the finite state machine to a new state, provided the new state is not
284 * @param newState new state, or {@code null} if to remain unchanged
286 private void changeState(State newState) {
287 if (newState != null) {
288 current.cancelTimers();
296 public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
297 // wrap the task in a TimerAction and schedule it
298 ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
300 // wrap the future in a "CancellableScheduledTask"
301 return () -> fut.cancel(false);
305 public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
306 // wrap the task in a TimerAction and schedule it
307 ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
308 TimeUnit.MILLISECONDS);
310 // wrap the future in a "CancellableScheduledTask"
311 return () -> fut.cancel(false);
315 public void publishAdmin(Message msg) {
316 publish(Message.ADMIN, msg);
320 public void publish(String channel, Message msg) {
321 logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
323 msg.setChannel(channel);
326 // ensure it's valid before we send it
329 String txt = serializer.encodeMsg(msg);
330 topicMessageManager.publish(txt);
332 } catch (JsonParseException e) {
333 logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
335 } catch (PoolingFeatureException e) {
336 logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
341 * Handles an event from the internal topic.
343 * @param commType comm infrastructure
344 * @param topic2 topic
348 public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
351 logger.error("null event on topic {}", topic);
355 synchronized (curLocker) {
356 // it's on the internal topic
357 handleInternal(event);
362 * Called by the PolicyController before it offers the event to the DroolsController.
363 * If the controller is locked, then it isn't processing events. However, they still
364 * need to be forwarded, thus in that case, they are decoded and forwarded.
366 * <p>On the other hand, if the controller is not locked, then we just return immediately
367 * and let {@link #beforeInsert(String, Object) beforeInsert()} handle
368 * it instead, as it already has the decoded message.
370 * @param topic2 topic
372 * @return {@code true} if the event was handled by the manager, {@code false} if it
373 * must still be handled by the invoker
375 public boolean beforeOffer(String topic2, String event) {
377 if (!controller.isLocked()) {
378 // we should NOT intercept this message - let the invoker handle it
382 return handleExternal(topic2, decodeEvent(topic2, event));
386 * Called by the DroolsController before it inserts the event into the rule engine.
388 * @param topic2 topic
389 * @param event event, as an object
390 * @return {@code true} if the event was handled by the manager, {@code false} if it
391 * must still be handled by the invoker
393 public boolean beforeInsert(String topic2, Object event) {
394 return handleExternal(topic2, event);
398 * Handles an event from an external topic.
400 * @param topic2 topic
401 * @param event event, as an object, or {@code null} if it cannot be decoded
402 * @return {@code true} if the event was handled by the manager, {@code false} if it
403 * must still be handled by the invoker
405 private boolean handleExternal(String topic2, Object event) {
407 // no event - let the invoker handle it
411 synchronized (curLocker) {
412 return handleExternal(topic2, event, event.hashCode());
417 * Handles an event from an external topic.
419 * @param topic2 topic
420 * @param event event, as an object
421 * @param eventHashCode event's hash code
422 * @return {@code true} if the event was handled, {@code false} if the invoker should
425 private boolean handleExternal(String topic2, Object event, int eventHashCode) {
426 if (assignments == null) {
427 // no bucket assignments yet - handle locally
428 logger.info("handle event locally for request {}", event);
430 // we did NOT consume the event
434 return handleEvent(topic2, event, eventHashCode);
439 * Handles a {@link Forward} event, possibly forwarding it again.
441 * @param topic2 topic
442 * @param event event, as an object
443 * @param eventHashCode event's hash code
444 * @return {@code true} if the event was handled, {@code false} if the invoker should
447 private boolean handleEvent(String topic2, Object event, int eventHashCode) {
448 String target = assignments.getAssignedHost(eventHashCode);
450 if (target == null) {
452 * This bucket has no assignment - just discard the event
454 logger.warn("discarded event for unassigned bucket from topic {}", topic2);
458 if (target.equals(host)) {
460 * Message belongs to this host - allow the controller to handle it.
462 logger.info("handle local event for request {} from topic {}", event, topic2);
466 // not our message, consume the event
467 logger.warn("discarded event for host {} from topic {}", target, topic2);
472 * Decodes an event from a String into an event Object.
474 * @param topic2 topic
476 * @return the decoded event object, or {@code null} if it can't be decoded
478 private Object decodeEvent(String topic2, String event) {
479 DroolsController drools = controller.getDrools();
481 // check if this topic has a decoder
483 if (!canDecodeEvent(drools, topic2)) {
485 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
486 drools.getArtifactId());
493 return decodeEventWrapper(drools, topic2, event);
495 } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
496 logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
502 * Handles an event from the internal topic. This uses reflection to identify the
503 * appropriate process() method to invoke, based on the type of Message that was
506 * @param event the serialized {@link Message} read from the internal topic
508 private void handleInternal(String event) {
509 Class<?> clazz = null;
512 Message msg = serializer.decodeMsg(event);
514 // get the class BEFORE checking the validity
515 clazz = msg.getClass();
519 var meth = current.getClass().getMethod("process", msg.getClass());
520 changeState((State) meth.invoke(current, msg));
522 } catch (JsonParseException e) {
523 logger.warn("failed to decode message for topic {}", topic, e);
525 } catch (NoSuchMethodException | SecurityException e) {
526 logger.error("no processor for message {} for topic {}", clazz, topic, e);
528 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
529 | PoolingFeatureException e) {
530 logger.error("failed to process message {} for topic {}", clazz, topic, e);
535 public void startDistributing(BucketAssignments asgn) {
536 synchronized (curLocker) {
537 int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
538 logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
544 public State goStart() {
545 return new StartState(this);
549 public State goQuery() {
550 return new QueryState(this);
554 public State goActive() {
555 activeLatch.countDown();
556 return new ActiveState(this);
560 public State goInactive() {
561 return new InactiveState(this);
565 * Action to run a timer task. Only runs the task if the machine is still in the state
566 * that it was in when the timer was created.
568 private class TimerAction implements Runnable {
571 * State of the machine when the timer was created.
573 private State origState;
576 * Task to be executed.
578 private StateTimerTask task;
583 * @param task task to execute when this timer runs
585 public TimerAction(StateTimerTask task) {
586 this.origState = current;
592 synchronized (curLocker) {
593 if (current == origState) {
594 changeState(task.fire());
601 * Creates a DMaaP manager.
603 * @param topic name of the internal DMaaP topic
604 * @return a new topic messages manager
605 * @throws PoolingFeatureException if an error occurs
607 protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
608 return new TopicMessageManager(topic);
612 * Creates a scheduled thread pool.
614 * @return a new scheduled thread pool
616 protected ScheduledThreadPoolExecutor makeScheduler() {
617 return new ScheduledThreadPoolExecutor(1);
621 * Determines if the event can be decoded.
623 * @param drools drools controller
624 * @param topic topic on which the event was received
625 * @return {@code true} if the event can be decoded, {@code false} otherwise
627 protected boolean canDecodeEvent(DroolsController drools, String topic) {
628 return EventProtocolCoderConstants.getManager().isDecodingSupported(drools.getGroupId(), drools.getArtifactId(),
635 * @param drools drools controller
636 * @param topic topic on which the event was received
637 * @param event event text to be decoded
638 * @return the decoded event
639 * @throws IllegalArgumentException illegal argument
640 * @throws UnsupportedOperationException unsupported operation
641 * @throws IllegalStateException illegal state
643 protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
644 return EventProtocolCoderConstants.getManager().decode(drools.getGroupId(), drools.getArtifactId(), topic,