2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import java.io.IOException;
25 import java.lang.reflect.InvocationTargetException;
26 import java.lang.reflect.Method;
28 import java.util.Properties;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ScheduledFuture;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
34 import org.onap.policy.common.endpoints.event.comm.TopicListener;
35 import org.onap.policy.common.utils.properties.SpecProperties;
36 import org.onap.policy.drools.controller.DroolsController;
37 import org.onap.policy.drools.pooling.extractor.ClassExtractors;
38 import org.onap.policy.drools.pooling.message.BucketAssignments;
39 import org.onap.policy.drools.pooling.message.Forward;
40 import org.onap.policy.drools.pooling.message.Leader;
41 import org.onap.policy.drools.pooling.message.Message;
42 import org.onap.policy.drools.pooling.message.Offline;
43 import org.onap.policy.drools.pooling.state.ActiveState;
44 import org.onap.policy.drools.pooling.state.IdleState;
45 import org.onap.policy.drools.pooling.state.InactiveState;
46 import org.onap.policy.drools.pooling.state.QueryState;
47 import org.onap.policy.drools.pooling.state.StartState;
48 import org.onap.policy.drools.pooling.state.State;
49 import org.onap.policy.drools.pooling.state.StateTimerTask;
50 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
51 import org.onap.policy.drools.system.PolicyController;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
57 * events coming from external topics are saved in a queue for later processing. Once
58 * assignments are made, the saved events are processed. In addition, while the controller
59 * is locked, events are still forwarded to other hosts and bucket assignments are still
60 * updated, based on any {@link Leader} messages that it receives.
62 public class PoolingManagerImpl implements PoolingManager, TopicListener {
64 private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
67 * Maximum number of times a message can be forwarded.
69 public static final int MAX_HOPS = 5;
74 private final String host;
77 * Properties with which this was configured.
79 private final PoolingProperties props;
82 * Associated controller.
84 private final PolicyController controller;
87 * Where to offer events that have been forwarded to this host (i.e, the controller).
89 private final TopicListener listener;
92 * Decremented each time the manager enters the Active state. Used by junit tests.
94 private final CountDownLatch activeLatch;
97 * Used to encode & decode request objects received from & sent to a rule engine.
99 private final Serializer serializer;
102 * Internal DMaaP topic used by this controller.
104 private final String topic;
107 * Manager for the internal DMaaP topic.
109 private final DmaapManager dmaapMgr;
112 * Used to extract the request id from the decoded message.
114 private final ClassExtractors extractors;
117 * Lock used while updating {@link #current}. In general, public methods must use
118 * this, while private methods assume the lock is already held.
120 private final Object curLocker = new Object();
125 * <p>This uses a finite state machine, wherein the state object contains all of the data
126 * relevant to that state. Each state object has a process() method, specific to each
127 * type of {@link Message} subclass. The method returns the next state object, or
128 * {@code null} if the state is to remain the same.
130 private State current;
133 * Current bucket assignments or {@code null}.
135 private BucketAssignments assignments = null;
138 * Pool used to execute timers.
140 private ScheduledThreadPoolExecutor scheduler = null;
143 * {@code True} if events offered by the controller should be intercepted,
144 * {@code false} otherwise.
146 private boolean intercept = true;
149 * Constructs the manager, initializing all of the data structures.
151 * @param host name/uuid of this host
152 * @param controller controller with which this is associated
153 * @param props feature properties specific to the controller
154 * @param activeLatch latch to be decremented each time the manager enters the Active
157 public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
158 CountDownLatch activeLatch) {
160 this.controller = controller;
162 this.activeLatch = activeLatch;
165 this.listener = (TopicListener) controller;
166 this.serializer = new Serializer();
167 this.topic = props.getPoolingTopic();
168 this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
169 this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
170 this.current = new IdleState(this);
172 logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
174 } catch (ClassCastException e) {
175 logger.error("not a topic listener, controller {}", controller.getName());
176 throw new PoolingFeatureRtException(e);
178 } catch (PoolingFeatureException e) {
179 logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
180 throw new PoolingFeatureRtException(e);
185 * Should only be used by junit tests.
187 * @return the current state
189 protected State getCurrent() {
190 synchronized (curLocker) {
196 public String getHost() {
201 public String getTopic() {
206 public PoolingProperties getProperties() {
211 * Makes properties for configuring extractors.
213 * @param controller the controller for which the extractors will be configured
214 * @param source properties from which to get the extractor properties
215 * @return extractor properties
217 private Properties makeExtractorProps(PolicyController controller, Properties source) {
218 return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
222 * Indicates that the controller is about to start. Starts the publisher for the
223 * internal topic, and creates a thread pool for the timers.
225 public void beforeStart() {
226 synchronized (curLocker) {
227 if (scheduler == null) {
228 dmaapMgr.startPublisher();
230 logger.debug("make scheduler thread for topic {}", getTopic());
231 scheduler = makeScheduler();
234 * Only a handful of timers at any moment, thus we can afford to take the
235 * time to remove them when they're cancelled.
237 scheduler.setRemoveOnCancelPolicy(true);
238 scheduler.setMaximumPoolSize(1);
239 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
240 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
246 * Indicates that the controller has successfully started. Starts the consumer for the
247 * internal topic, enters the {@link StartState}, and sets the filter for the initial
250 public void afterStart() {
251 synchronized (curLocker) {
252 if (current instanceof IdleState) {
253 dmaapMgr.startConsumer(this);
254 changeState(new StartState(this));
260 * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
261 * and the current state.
263 public void beforeStop() {
264 ScheduledThreadPoolExecutor sched;
266 synchronized (curLocker) {
270 if (!(current instanceof IdleState)) {
271 changeState(new IdleState(this));
272 dmaapMgr.stopConsumer(this);
273 publishAdmin(new Offline(getHost()));
280 logger.debug("stop scheduler for topic {}", getTopic());
286 * Indicates that the controller has stopped. Stops the publisher and logs a warning
287 * if any events are still in the queue.
289 public void afterStop() {
290 synchronized (curLocker) {
292 * stop the publisher, but allow time for any Offline message to be
295 dmaapMgr.stopPublisher(props.getOfflinePubWaitMs());
300 * Indicates that the controller is about to be locked. Enters the idle state, as all
301 * it will be doing is forwarding messages.
303 public void beforeLock() {
304 logger.info("locking manager for topic {}", getTopic());
306 synchronized (curLocker) {
307 changeState(new IdleState(this));
312 * Indicates that the controller has been unlocked. Enters the start state, if the
313 * controller is running.
315 public void afterUnlock() {
316 logger.info("unlocking manager for topic {}", getTopic());
318 synchronized (curLocker) {
319 if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
320 changeState(new StartState(this));
326 * Changes the finite state machine to a new state, provided the new state is not
329 * @param newState new state, or {@code null} if to remain unchanged
331 private void changeState(State newState) {
332 if (newState != null) {
333 current.cancelTimers();
336 // set the filter before starting the state
337 setFilter(newState.getFilter());
343 * Sets the server-side filter for the internal topic.
345 * @param filter new filter to be used
347 private void setFilter(Map<String, Object> filter) {
349 dmaapMgr.setFilter(serializer.encodeFilter(filter));
351 } catch (JsonProcessingException e) {
352 logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
354 } catch (PoolingFeatureException e) {
355 logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
360 public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
361 // wrap the task in a TimerAction and schedule it
362 ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
364 // wrap the future in a "CancellableScheduledTask"
365 return () -> fut.cancel(false);
369 public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
370 // wrap the task in a TimerAction and schedule it
371 ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
372 TimeUnit.MILLISECONDS);
374 // wrap the future in a "CancellableScheduledTask"
375 return () -> fut.cancel(false);
379 public void publishAdmin(Message msg) {
380 publish(Message.ADMIN, msg);
384 public void publish(String channel, Message msg) {
385 logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
387 msg.setChannel(channel);
390 // ensure it's valid before we send it
393 String txt = serializer.encodeMsg(msg);
394 dmaapMgr.publish(txt);
396 } catch (JsonProcessingException e) {
397 logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
399 } catch (PoolingFeatureException e) {
400 logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
405 * Handles an event from the internal topic.
407 * @param commType comm infrastructure
408 * @param topic2 topic
412 public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
415 logger.error("null event on topic {}", topic);
419 synchronized (curLocker) {
420 // it's on the internal topic
421 handleInternal(event);
426 * Called by the PolicyController before it offers the event to the DroolsController.
427 * If the controller is locked, then it isn't processing events. However, they still
428 * need to be forwarded, thus in that case, they are decoded and forwarded.
430 * <p>On the other hand, if the controller is not locked, then we just return immediately
431 * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
432 * it instead, as it already has the decoded message.
434 * @param protocol protocol
435 * @param topic2 topic
437 * @return {@code true} if the event was handled by the manager, {@code false} if it
438 * must still be handled by the invoker
440 public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
442 if (!controller.isLocked() || !intercept) {
443 // we should NOT intercept this message - let the invoker handle it
447 return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
451 * Called by the DroolsController before it inserts the event into the rule engine.
453 * @param protocol protocol
454 * @param topic2 topic
455 * @param event original event text, as received from the Bus
456 * @param event2 event, as an object
457 * @return {@code true} if the event was handled by the manager, {@code false} if it
458 * must still be handled by the invoker
460 public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
463 // we should NOT intercept this message - let the invoker handle it
467 return handleExternal(protocol, topic2, event, extractRequestId(event2));
471 * Handles an event from an external topic.
473 * @param protocol protocol
474 * @param topic2 topic
476 * @param reqid request id extracted from the event, or {@code null} if it couldn't be
478 * @return {@code true} if the event was handled by the manager, {@code false} if it
479 * must still be handled by the invoker
481 private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
483 // no request id - let the invoker handle it
487 if (reqid.isEmpty()) {
488 logger.warn("handle locally due to empty request id for topic {}", topic2);
489 // no request id - let the invoker handle it
493 Forward ev = makeForward(protocol, topic2, event, reqid);
495 // invalid args - consume the message
496 logger.warn("constructed an invalid Forward message on topic {}", getTopic());
500 synchronized (curLocker) {
501 return handleExternal(ev);
506 * Handles an event from an external topic.
509 * @return {@code true} if the event was handled, {@code false} if the invoker should
512 private boolean handleExternal(Forward event) {
513 if (assignments == null) {
514 // no bucket assignments yet - handle locally
515 logger.info("handle event locally for request {}", event.getRequestId());
517 // we did NOT consume the event
521 return handleEvent(event);
526 * Handles a {@link Forward} event, possibly forwarding it again.
529 * @return {@code true} if the event was handled, {@code false} if the invoker should
532 private boolean handleEvent(Forward event) {
533 String target = assignments.getAssignedHost(event.getRequestId().hashCode());
535 if (target == null) {
537 * This bucket has no assignment - just discard the event
539 logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
543 if (target.equals(host)) {
545 * Message belongs to this host - allow the controller to handle it.
547 logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
551 // forward to a different host, if hop count has been exhausted
552 if (event.getNumHops() > MAX_HOPS) {
553 logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
557 logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
559 publish(target, event);
562 // either way, consume the event
567 * Extract the request id from an event object.
569 * @param event the event object, or {@code null}
570 * @return the event's request id, or {@code null} if it can't be extracted
572 private String extractRequestId(Object event) {
577 Object reqid = extractors.extract(event);
578 return (reqid != null ? reqid.toString() : null);
582 * Decodes an event from a String into an event Object.
584 * @param topic2 topic
586 * @return the decoded event object, or {@code null} if it can't be decoded
588 private Object decodeEvent(String topic2, String event) {
589 DroolsController drools = controller.getDrools();
591 // check if this topic has a decoder
593 if (!canDecodeEvent(drools, topic2)) {
595 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
596 drools.getArtifactId());
603 return decodeEventWrapper(drools, topic2, event);
605 } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
606 logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
612 * Makes a {@link Forward}, and validates its contents.
614 * @param protocol protocol
615 * @param topic2 topic
617 * @param reqid request id
618 * @return a new message, or {@code null} if the message was invalid
620 private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
622 Forward ev = new Forward(host, protocol, topic2, event, reqid);
624 // required for the validity check
631 } catch (PoolingFeatureException e) {
632 logger.error("invalid message for topic {}", topic2, e);
638 public void handle(Forward event) {
639 synchronized (curLocker) {
640 if (!handleExternal(event)) {
641 // this host should handle it - inject it
648 * Injects an event into the controller.
652 private void inject(Forward event) {
653 logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
657 listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
665 * Handles an event from the internal topic. This uses reflection to identify the
666 * appropriate process() method to invoke, based on the type of Message that was
669 * @param event the serialized {@link Message} read from the internal topic
671 private void handleInternal(String event) {
672 Class<?> clazz = null;
675 Message msg = serializer.decodeMsg(event);
677 // get the class BEFORE checking the validity
678 clazz = msg.getClass();
682 Method meth = current.getClass().getMethod("process", msg.getClass());
683 changeState((State) meth.invoke(current, msg));
685 } catch (IOException e) {
686 logger.warn("failed to decode message for topic {}", topic, e);
688 } catch (NoSuchMethodException | SecurityException e) {
689 logger.error("no processor for message {} for topic {}", clazz, topic, e);
691 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
692 | PoolingFeatureException e) {
693 logger.error("failed to process message {} for topic {}", clazz, topic, e);
698 public void startDistributing(BucketAssignments asgn) {
699 synchronized (curLocker) {
700 int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
701 logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
707 public BucketAssignments getAssignments() {
712 public State goStart() {
713 return new StartState(this);
717 public State goQuery() {
718 return new QueryState(this);
722 public State goActive() {
723 activeLatch.countDown();
724 return new ActiveState(this);
728 public State goInactive() {
729 return new InactiveState(this);
733 * Action to run a timer task. Only runs the task if the machine is still in the state
734 * that it was in when the timer was created.
736 private class TimerAction implements Runnable {
739 * State of the machine when the timer was created.
741 private State origState;
744 * Task to be executed.
746 private StateTimerTask task;
751 * @param task task to execute when this timer runs
753 public TimerAction(StateTimerTask task) {
754 this.origState = current;
760 synchronized (curLocker) {
761 if (current == origState) {
762 changeState(task.fire());
769 * The remaining methods may be overridden by junit tests.
773 * Creates object extractors.
775 * @param props properties used to configure the extractors
776 * @return a new set of extractors
778 protected ClassExtractors makeClassExtractors(Properties props) {
779 return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
780 PoolingProperties.EXTRACTOR_TYPE);
784 * Creates a DMaaP manager.
786 * @param topic name of the internal DMaaP topic
787 * @return a new DMaaP manager
788 * @throws PoolingFeatureException if an error occurs
790 protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
791 return new DmaapManager(topic);
795 * Creates a scheduled thread pool.
797 * @return a new scheduled thread pool
799 protected ScheduledThreadPoolExecutor makeScheduler() {
800 return new ScheduledThreadPoolExecutor(1);
804 * Determines if the event can be decoded.
806 * @param drools drools controller
807 * @param topic topic on which the event was received
808 * @return {@code true} if the event can be decoded, {@code false} otherwise
810 protected boolean canDecodeEvent(DroolsController drools, String topic) {
811 return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic);
817 * @param drools drools controller
818 * @param topic topic on which the event was received
819 * @param event event text to be decoded
820 * @return the decoded event
821 * @throws IllegalArgumentException illegal argument
822 * @throw UnsupportedOperationException unsupported operation
823 * @throws IllegalStateException illegal state
825 protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
826 return EventProtocolCoder.manager.decode(drools.getGroupId(), drools.getArtifactId(), topic, event);