2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import com.google.gson.JsonParseException;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
31 import org.onap.policy.common.endpoints.event.comm.TopicListener;
32 import org.onap.policy.drools.controller.DroolsController;
33 import org.onap.policy.drools.pooling.message.BucketAssignments;
34 import org.onap.policy.drools.pooling.message.Leader;
35 import org.onap.policy.drools.pooling.message.Message;
36 import org.onap.policy.drools.pooling.message.Offline;
37 import org.onap.policy.drools.pooling.state.ActiveState;
38 import org.onap.policy.drools.pooling.state.IdleState;
39 import org.onap.policy.drools.pooling.state.InactiveState;
40 import org.onap.policy.drools.pooling.state.QueryState;
41 import org.onap.policy.drools.pooling.state.StartState;
42 import org.onap.policy.drools.pooling.state.State;
43 import org.onap.policy.drools.pooling.state.StateTimerTask;
44 import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
45 import org.onap.policy.drools.system.PolicyController;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
51 * events coming from external topics are saved in a queue for later processing. Once
52 * assignments are made, the saved events are processed. In addition, while the controller
53 * is locked, events are still forwarded to other hosts and bucket assignments are still
54 * updated, based on any {@link Leader} messages that it receives.
56 public class PoolingManagerImpl implements PoolingManager, TopicListener {
58 private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
61 * Maximum number of times a message can be forwarded.
63 public static final int MAX_HOPS = 5;
68 private final String host;
71 * Properties with which this was configured.
73 private final PoolingProperties props;
76 * Associated controller.
78 private final PolicyController controller;
81 * Decremented each time the manager enters the Active state. Used by junit tests.
83 private final CountDownLatch activeLatch;
86 * Used to encode & decode request objects received from & sent to a rule engine.
88 private final Serializer serializer;
91 * Internal DMaaP topic used by this controller.
93 private final String topic;
96 * Manager for the internal DMaaP topic.
98 private final DmaapManager dmaapMgr;
101 * Lock used while updating {@link #current}. In general, public methods must use
102 * this, while private methods assume the lock is already held.
104 private final Object curLocker = new Object();
109 * <p>This uses a finite state machine, wherein the state object contains all of the data
110 * relevant to that state. Each state object has a process() method, specific to each
111 * type of {@link Message} subclass. The method returns the next state object, or
112 * {@code null} if the state is to remain the same.
114 private State current;
117 * Current bucket assignments or {@code null}.
119 private BucketAssignments assignments = null;
122 * Pool used to execute timers.
124 private ScheduledThreadPoolExecutor scheduler = null;
127 * Constructs the manager, initializing all of the data structures.
129 * @param host name/uuid of this host
130 * @param controller controller with which this is associated
131 * @param props feature properties specific to the controller
132 * @param activeLatch latch to be decremented each time the manager enters the Active
135 public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
136 CountDownLatch activeLatch) {
138 this.controller = controller;
140 this.activeLatch = activeLatch;
143 this.serializer = new Serializer();
144 this.topic = props.getPoolingTopic();
145 this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
146 this.current = new IdleState(this);
148 logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
150 } catch (ClassCastException e) {
151 logger.error("not a topic listener, controller {}", controller.getName());
152 throw new PoolingFeatureRtException(e);
154 } catch (PoolingFeatureException e) {
155 logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
156 throw new PoolingFeatureRtException(e);
161 * Should only be used by junit tests.
163 * @return the current state
165 protected State getCurrent() {
166 synchronized (curLocker) {
172 public String getHost() {
177 public String getTopic() {
182 public PoolingProperties getProperties() {
187 * Indicates that the controller is about to start. Starts the publisher for the
188 * internal topic, and creates a thread pool for the timers.
190 public void beforeStart() {
191 synchronized (curLocker) {
192 if (scheduler == null) {
193 dmaapMgr.startPublisher();
195 logger.debug("make scheduler thread for topic {}", getTopic());
196 scheduler = makeScheduler();
199 * Only a handful of timers at any moment, thus we can afford to take the
200 * time to remove them when they're cancelled.
202 scheduler.setRemoveOnCancelPolicy(true);
203 scheduler.setMaximumPoolSize(1);
204 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
205 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
211 * Indicates that the controller has successfully started. Starts the consumer for the
212 * internal topic, enters the {@link StartState}, and sets the filter for the initial
215 public void afterStart() {
216 synchronized (curLocker) {
217 if (current instanceof IdleState) {
218 dmaapMgr.startConsumer(this);
219 changeState(new StartState(this));
225 * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
226 * and the current state.
228 public void beforeStop() {
229 ScheduledThreadPoolExecutor sched;
231 synchronized (curLocker) {
235 if (!(current instanceof IdleState)) {
236 changeState(new IdleState(this));
237 dmaapMgr.stopConsumer(this);
238 publishAdmin(new Offline(getHost()));
245 logger.debug("stop scheduler for topic {}", getTopic());
251 * Indicates that the controller has stopped. Stops the publisher and logs a warning
252 * if any events are still in the queue.
254 public void afterStop() {
255 synchronized (curLocker) {
257 * stop the publisher, but allow time for any Offline message to be
260 dmaapMgr.stopPublisher(props.getOfflinePubWaitMs());
265 * Indicates that the controller is about to be locked. Enters the idle state, as all
266 * it will be doing is forwarding messages.
268 public void beforeLock() {
269 logger.info("locking manager for topic {}", getTopic());
271 synchronized (curLocker) {
272 changeState(new IdleState(this));
277 * Indicates that the controller has been unlocked. Enters the start state, if the
278 * controller is running.
280 public void afterUnlock() {
281 logger.info("unlocking manager for topic {}", getTopic());
283 synchronized (curLocker) {
284 if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
285 changeState(new StartState(this));
291 * Changes the finite state machine to a new state, provided the new state is not
294 * @param newState new state, or {@code null} if to remain unchanged
296 private void changeState(State newState) {
297 if (newState != null) {
298 current.cancelTimers();
306 public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
307 // wrap the task in a TimerAction and schedule it
308 ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
310 // wrap the future in a "CancellableScheduledTask"
311 return () -> fut.cancel(false);
315 public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
316 // wrap the task in a TimerAction and schedule it
317 ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
318 TimeUnit.MILLISECONDS);
320 // wrap the future in a "CancellableScheduledTask"
321 return () -> fut.cancel(false);
325 public void publishAdmin(Message msg) {
326 publish(Message.ADMIN, msg);
330 public void publish(String channel, Message msg) {
331 logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
333 msg.setChannel(channel);
336 // ensure it's valid before we send it
339 String txt = serializer.encodeMsg(msg);
340 dmaapMgr.publish(txt);
342 } catch (JsonParseException e) {
343 logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
345 } catch (PoolingFeatureException e) {
346 logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
351 * Handles an event from the internal topic.
353 * @param commType comm infrastructure
354 * @param topic2 topic
358 public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
361 logger.error("null event on topic {}", topic);
365 synchronized (curLocker) {
366 // it's on the internal topic
367 handleInternal(event);
372 * Called by the PolicyController before it offers the event to the DroolsController.
373 * If the controller is locked, then it isn't processing events. However, they still
374 * need to be forwarded, thus in that case, they are decoded and forwarded.
376 * <p>On the other hand, if the controller is not locked, then we just return immediately
377 * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
378 * it instead, as it already has the decoded message.
380 * @param topic2 topic
382 * @return {@code true} if the event was handled by the manager, {@code false} if it
383 * must still be handled by the invoker
385 public boolean beforeOffer(String topic2, String event) {
387 if (!controller.isLocked()) {
388 // we should NOT intercept this message - let the invoker handle it
392 return handleExternal(topic2, decodeEvent(topic2, event));
396 * Called by the DroolsController before it inserts the event into the rule engine.
398 * @param topic2 topic
399 * @param event event, as an object
400 * @return {@code true} if the event was handled by the manager, {@code false} if it
401 * must still be handled by the invoker
403 public boolean beforeInsert(String topic2, Object event) {
404 return handleExternal(topic2, event);
408 * Handles an event from an external topic.
410 * @param topic2 topic
411 * @param event event, as an object, or {@code null} if it cannot be decoded
412 * @return {@code true} if the event was handled by the manager, {@code false} if it
413 * must still be handled by the invoker
415 private boolean handleExternal(String topic2, Object event) {
417 // no event - let the invoker handle it
421 synchronized (curLocker) {
422 return handleExternal(topic2, event, event.hashCode());
427 * Handles an event from an external topic.
429 * @param topic2 topic
430 * @param event event, as an object
431 * @param eventHashCode event's hash code
432 * @return {@code true} if the event was handled, {@code false} if the invoker should
435 private boolean handleExternal(String topic2, Object event, int eventHashCode) {
436 if (assignments == null) {
437 // no bucket assignments yet - handle locally
438 logger.info("handle event locally for request {}", event);
440 // we did NOT consume the event
444 return handleEvent(topic2, event, eventHashCode);
449 * Handles a {@link Forward} event, possibly forwarding it again.
451 * @param topic2 topic
452 * @param event event, as an object
453 * @param eventHashCode event's hash code
454 * @return {@code true} if the event was handled, {@code false} if the invoker should
457 private boolean handleEvent(String topic2, Object event, int eventHashCode) {
458 String target = assignments.getAssignedHost(eventHashCode);
460 if (target == null) {
462 * This bucket has no assignment - just discard the event
464 logger.warn("discarded event for unassigned bucket from topic {}", topic2);
468 if (target.equals(host)) {
470 * Message belongs to this host - allow the controller to handle it.
472 logger.info("handle local event for request {} from topic {}", event, topic2);
476 // not our message, consume the event
477 logger.warn("discarded event for host {} from topic {}", target, topic2);
482 * Decodes an event from a String into an event Object.
484 * @param topic2 topic
486 * @return the decoded event object, or {@code null} if it can't be decoded
488 private Object decodeEvent(String topic2, String event) {
489 DroolsController drools = controller.getDrools();
491 // check if this topic has a decoder
493 if (!canDecodeEvent(drools, topic2)) {
495 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
496 drools.getArtifactId());
503 return decodeEventWrapper(drools, topic2, event);
505 } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
506 logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
512 * Handles an event from the internal topic. This uses reflection to identify the
513 * appropriate process() method to invoke, based on the type of Message that was
516 * @param event the serialized {@link Message} read from the internal topic
518 private void handleInternal(String event) {
519 Class<?> clazz = null;
522 Message msg = serializer.decodeMsg(event);
524 // get the class BEFORE checking the validity
525 clazz = msg.getClass();
529 Method meth = current.getClass().getMethod("process", msg.getClass());
530 changeState((State) meth.invoke(current, msg));
532 } catch (JsonParseException e) {
533 logger.warn("failed to decode message for topic {}", topic, e);
535 } catch (NoSuchMethodException | SecurityException e) {
536 logger.error("no processor for message {} for topic {}", clazz, topic, e);
538 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
539 | PoolingFeatureException e) {
540 logger.error("failed to process message {} for topic {}", clazz, topic, e);
545 public void startDistributing(BucketAssignments asgn) {
546 synchronized (curLocker) {
547 int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
548 logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
554 public BucketAssignments getAssignments() {
559 public State goStart() {
560 return new StartState(this);
564 public State goQuery() {
565 return new QueryState(this);
569 public State goActive() {
570 activeLatch.countDown();
571 return new ActiveState(this);
575 public State goInactive() {
576 return new InactiveState(this);
580 * Action to run a timer task. Only runs the task if the machine is still in the state
581 * that it was in when the timer was created.
583 private class TimerAction implements Runnable {
586 * State of the machine when the timer was created.
588 private State origState;
591 * Task to be executed.
593 private StateTimerTask task;
598 * @param task task to execute when this timer runs
600 public TimerAction(StateTimerTask task) {
601 this.origState = current;
607 synchronized (curLocker) {
608 if (current == origState) {
609 changeState(task.fire());
616 * Creates a DMaaP manager.
618 * @param topic name of the internal DMaaP topic
619 * @return a new DMaaP manager
620 * @throws PoolingFeatureException if an error occurs
622 protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
623 return new DmaapManager(topic);
627 * Creates a scheduled thread pool.
629 * @return a new scheduled thread pool
631 protected ScheduledThreadPoolExecutor makeScheduler() {
632 return new ScheduledThreadPoolExecutor(1);
636 * Determines if the event can be decoded.
638 * @param drools drools controller
639 * @param topic topic on which the event was received
640 * @return {@code true} if the event can be decoded, {@code false} otherwise
642 protected boolean canDecodeEvent(DroolsController drools, String topic) {
643 return EventProtocolCoderConstants.getManager().isDecodingSupported(drools.getGroupId(), drools.getArtifactId(),
650 * @param drools drools controller
651 * @param topic topic on which the event was received
652 * @param event event text to be decoded
653 * @return the decoded event
654 * @throws IllegalArgumentException illegal argument
655 * @throw UnsupportedOperationException unsupported operation
656 * @throws IllegalStateException illegal state
658 protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
659 return EventProtocolCoderConstants.getManager().decode(drools.getGroupId(), drools.getArtifactId(), topic,