2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import com.google.gson.JsonParseException;
24 import java.lang.reflect.InvocationTargetException;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
30 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
31 import org.onap.policy.common.endpoints.event.comm.TopicListener;
32 import org.onap.policy.drools.controller.DroolsController;
33 import org.onap.policy.drools.pooling.message.BucketAssignments;
34 import org.onap.policy.drools.pooling.message.Leader;
35 import org.onap.policy.drools.pooling.message.Message;
36 import org.onap.policy.drools.pooling.message.Offline;
37 import org.onap.policy.drools.pooling.state.ActiveState;
38 import org.onap.policy.drools.pooling.state.IdleState;
39 import org.onap.policy.drools.pooling.state.InactiveState;
40 import org.onap.policy.drools.pooling.state.QueryState;
41 import org.onap.policy.drools.pooling.state.StartState;
42 import org.onap.policy.drools.pooling.state.State;
43 import org.onap.policy.drools.pooling.state.StateTimerTask;
44 import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
45 import org.onap.policy.drools.system.PolicyController;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
51 * events coming from external topics are saved in a queue for later processing. Once
52 * assignments are made, the saved events are processed. In addition, while the controller
53 * is locked, events are still forwarded to other hosts and bucket assignments are still
54 * updated, based on any {@link Leader} messages that it receives.
56 public class PoolingManagerImpl implements PoolingManager, TopicListener {
58 private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
61 * Maximum number of times a message can be forwarded.
63 public static final int MAX_HOPS = 5;
69 private final String host;
72 * Properties with which this was configured.
75 private final PoolingProperties properties;
78 * Associated controller.
80 private final PolicyController controller;
83 * Decremented each time the manager enters the Active state. Used by junit tests.
85 private final CountDownLatch activeLatch;
88 * Used to encode & decode request objects received from & sent to a rule engine.
90 private final Serializer serializer;
93 * Internal DMaaP topic used by this controller.
96 private final String topic;
99 * Manager for the internal DMaaP topic.
101 private final DmaapManager dmaapMgr;
104 * Lock used while updating {@link #current}. In general, public methods must use
105 * this, while private methods assume the lock is already held.
107 private final Object curLocker = new Object();
112 * <p>This uses a finite state machine, wherein the state object contains all of the data
113 * relevant to that state. Each state object has a process() method, specific to each
114 * type of {@link Message} subclass. The method returns the next state object, or
115 * {@code null} if the state is to remain the same.
117 private State current;
120 * Current bucket assignments or {@code null}.
123 private BucketAssignments assignments = null;
126 * Pool used to execute timers.
128 private ScheduledThreadPoolExecutor scheduler = null;
131 * Constructs the manager, initializing all of the data structures.
133 * @param host name/uuid of this host
134 * @param controller controller with which this is associated
135 * @param props feature properties specific to the controller
136 * @param activeLatch latch to be decremented each time the manager enters the Active
139 public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
140 CountDownLatch activeLatch) {
142 this.controller = controller;
143 this.properties = props;
144 this.activeLatch = activeLatch;
147 this.serializer = new Serializer();
148 this.topic = props.getPoolingTopic();
149 this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
150 this.current = new IdleState(this);
152 logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
154 } catch (ClassCastException e) {
155 logger.error("not a topic listener, controller {}", controller.getName());
156 throw new PoolingFeatureRtException(e);
158 } catch (PoolingFeatureException e) {
159 logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
160 throw new PoolingFeatureRtException(e);
165 * Should only be used by junit tests.
167 * @return the current state
169 protected State getCurrent() {
170 synchronized (curLocker) {
176 * Indicates that the controller is about to start. Starts the publisher for the
177 * internal topic, and creates a thread pool for the timers.
179 public void beforeStart() {
180 synchronized (curLocker) {
181 if (scheduler == null) {
182 dmaapMgr.startPublisher();
184 logger.debug("make scheduler thread for topic {}", getTopic());
185 scheduler = makeScheduler();
188 * Only a handful of timers at any moment, thus we can afford to take the
189 * time to remove them when they're cancelled.
191 scheduler.setRemoveOnCancelPolicy(true);
192 scheduler.setMaximumPoolSize(1);
193 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
194 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
200 * Indicates that the controller has successfully started. Starts the consumer for the
201 * internal topic, enters the {@link StartState}, and sets the filter for the initial
204 public void afterStart() {
205 synchronized (curLocker) {
206 if (current instanceof IdleState) {
207 dmaapMgr.startConsumer(this);
208 changeState(new StartState(this));
214 * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
215 * and the current state.
217 public void beforeStop() {
218 ScheduledThreadPoolExecutor sched;
220 synchronized (curLocker) {
224 if (!(current instanceof IdleState)) {
225 changeState(new IdleState(this));
226 dmaapMgr.stopConsumer(this);
227 publishAdmin(new Offline(getHost()));
234 logger.debug("stop scheduler for topic {}", getTopic());
240 * Indicates that the controller has stopped. Stops the publisher and logs a warning
241 * if any events are still in the queue.
243 public void afterStop() {
244 synchronized (curLocker) {
246 * stop the publisher, but allow time for any Offline message to be
249 dmaapMgr.stopPublisher(properties.getOfflinePubWaitMs());
254 * Indicates that the controller is about to be locked. Enters the idle state, as all
255 * it will be doing is forwarding messages.
257 public void beforeLock() {
258 logger.info("locking manager for topic {}", getTopic());
260 synchronized (curLocker) {
261 changeState(new IdleState(this));
266 * Indicates that the controller has been unlocked. Enters the start state, if the
267 * controller is running.
269 public void afterUnlock() {
270 logger.info("unlocking manager for topic {}", getTopic());
272 synchronized (curLocker) {
273 if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
274 changeState(new StartState(this));
280 * Changes the finite state machine to a new state, provided the new state is not
283 * @param newState new state, or {@code null} if to remain unchanged
285 private void changeState(State newState) {
286 if (newState != null) {
287 current.cancelTimers();
295 public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
296 // wrap the task in a TimerAction and schedule it
297 ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
299 // wrap the future in a "CancellableScheduledTask"
300 return () -> fut.cancel(false);
304 public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
305 // wrap the task in a TimerAction and schedule it
306 ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
307 TimeUnit.MILLISECONDS);
309 // wrap the future in a "CancellableScheduledTask"
310 return () -> fut.cancel(false);
314 public void publishAdmin(Message msg) {
315 publish(Message.ADMIN, msg);
319 public void publish(String channel, Message msg) {
320 logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
322 msg.setChannel(channel);
325 // ensure it's valid before we send it
328 String txt = serializer.encodeMsg(msg);
329 dmaapMgr.publish(txt);
331 } catch (JsonParseException e) {
332 logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
334 } catch (PoolingFeatureException e) {
335 logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
340 * Handles an event from the internal topic.
342 * @param commType comm infrastructure
343 * @param topic2 topic
347 public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
350 logger.error("null event on topic {}", topic);
354 synchronized (curLocker) {
355 // it's on the internal topic
356 handleInternal(event);
361 * Called by the PolicyController before it offers the event to the DroolsController.
362 * If the controller is locked, then it isn't processing events. However, they still
363 * need to be forwarded, thus in that case, they are decoded and forwarded.
365 * <p>On the other hand, if the controller is not locked, then we just return immediately
366 * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
367 * it instead, as it already has the decoded message.
369 * @param topic2 topic
371 * @return {@code true} if the event was handled by the manager, {@code false} if it
372 * must still be handled by the invoker
374 public boolean beforeOffer(String topic2, String event) {
376 if (!controller.isLocked()) {
377 // we should NOT intercept this message - let the invoker handle it
381 return handleExternal(topic2, decodeEvent(topic2, event));
385 * Called by the DroolsController before it inserts the event into the rule engine.
387 * @param topic2 topic
388 * @param event event, as an object
389 * @return {@code true} if the event was handled by the manager, {@code false} if it
390 * must still be handled by the invoker
392 public boolean beforeInsert(String topic2, Object event) {
393 return handleExternal(topic2, event);
397 * Handles an event from an external topic.
399 * @param topic2 topic
400 * @param event event, as an object, or {@code null} if it cannot be decoded
401 * @return {@code true} if the event was handled by the manager, {@code false} if it
402 * must still be handled by the invoker
404 private boolean handleExternal(String topic2, Object event) {
406 // no event - let the invoker handle it
410 synchronized (curLocker) {
411 return handleExternal(topic2, event, event.hashCode());
416 * Handles an event from an external topic.
418 * @param topic2 topic
419 * @param event event, as an object
420 * @param eventHashCode event's hash code
421 * @return {@code true} if the event was handled, {@code false} if the invoker should
424 private boolean handleExternal(String topic2, Object event, int eventHashCode) {
425 if (assignments == null) {
426 // no bucket assignments yet - handle locally
427 logger.info("handle event locally for request {}", event);
429 // we did NOT consume the event
433 return handleEvent(topic2, event, eventHashCode);
438 * Handles a {@link Forward} event, possibly forwarding it again.
440 * @param topic2 topic
441 * @param event event, as an object
442 * @param eventHashCode event's hash code
443 * @return {@code true} if the event was handled, {@code false} if the invoker should
446 private boolean handleEvent(String topic2, Object event, int eventHashCode) {
447 String target = assignments.getAssignedHost(eventHashCode);
449 if (target == null) {
451 * This bucket has no assignment - just discard the event
453 logger.warn("discarded event for unassigned bucket from topic {}", topic2);
457 if (target.equals(host)) {
459 * Message belongs to this host - allow the controller to handle it.
461 logger.info("handle local event for request {} from topic {}", event, topic2);
465 // not our message, consume the event
466 logger.warn("discarded event for host {} from topic {}", target, topic2);
471 * Decodes an event from a String into an event Object.
473 * @param topic2 topic
475 * @return the decoded event object, or {@code null} if it can't be decoded
477 private Object decodeEvent(String topic2, String event) {
478 DroolsController drools = controller.getDrools();
480 // check if this topic has a decoder
482 if (!canDecodeEvent(drools, topic2)) {
484 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
485 drools.getArtifactId());
492 return decodeEventWrapper(drools, topic2, event);
494 } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
495 logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
501 * Handles an event from the internal topic. This uses reflection to identify the
502 * appropriate process() method to invoke, based on the type of Message that was
505 * @param event the serialized {@link Message} read from the internal topic
507 private void handleInternal(String event) {
508 Class<?> clazz = null;
511 Message msg = serializer.decodeMsg(event);
513 // get the class BEFORE checking the validity
514 clazz = msg.getClass();
518 var meth = current.getClass().getMethod("process", msg.getClass());
519 changeState((State) meth.invoke(current, msg));
521 } catch (JsonParseException e) {
522 logger.warn("failed to decode message for topic {}", topic, e);
524 } catch (NoSuchMethodException | SecurityException e) {
525 logger.error("no processor for message {} for topic {}", clazz, topic, e);
527 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
528 | PoolingFeatureException e) {
529 logger.error("failed to process message {} for topic {}", clazz, topic, e);
534 public void startDistributing(BucketAssignments asgn) {
535 synchronized (curLocker) {
536 int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
537 logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
543 public State goStart() {
544 return new StartState(this);
548 public State goQuery() {
549 return new QueryState(this);
553 public State goActive() {
554 activeLatch.countDown();
555 return new ActiveState(this);
559 public State goInactive() {
560 return new InactiveState(this);
564 * Action to run a timer task. Only runs the task if the machine is still in the state
565 * that it was in when the timer was created.
567 private class TimerAction implements Runnable {
570 * State of the machine when the timer was created.
572 private State origState;
575 * Task to be executed.
577 private StateTimerTask task;
582 * @param task task to execute when this timer runs
584 public TimerAction(StateTimerTask task) {
585 this.origState = current;
591 synchronized (curLocker) {
592 if (current == origState) {
593 changeState(task.fire());
600 * Creates a DMaaP manager.
602 * @param topic name of the internal DMaaP topic
603 * @return a new DMaaP manager
604 * @throws PoolingFeatureException if an error occurs
606 protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
607 return new DmaapManager(topic);
611 * Creates a scheduled thread pool.
613 * @return a new scheduled thread pool
615 protected ScheduledThreadPoolExecutor makeScheduler() {
616 return new ScheduledThreadPoolExecutor(1);
620 * Determines if the event can be decoded.
622 * @param drools drools controller
623 * @param topic topic on which the event was received
624 * @return {@code true} if the event can be decoded, {@code false} otherwise
626 protected boolean canDecodeEvent(DroolsController drools, String topic) {
627 return EventProtocolCoderConstants.getManager().isDecodingSupported(drools.getGroupId(), drools.getArtifactId(),
634 * @param drools drools controller
635 * @param topic topic on which the event was received
636 * @param event event text to be decoded
637 * @return the decoded event
638 * @throws IllegalArgumentException illegal argument
639 * @throw UnsupportedOperationException unsupported operation
640 * @throws IllegalStateException illegal state
642 protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
643 return EventProtocolCoderConstants.getManager().decode(drools.getGroupId(), drools.getArtifactId(), topic,