2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import com.google.gson.JsonParseException;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
27 import java.util.Properties;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
33 import org.onap.policy.common.endpoints.event.comm.TopicListener;
34 import org.onap.policy.common.utils.properties.SpecProperties;
35 import org.onap.policy.drools.controller.DroolsController;
36 import org.onap.policy.drools.pooling.extractor.ClassExtractors;
37 import org.onap.policy.drools.pooling.message.BucketAssignments;
38 import org.onap.policy.drools.pooling.message.Forward;
39 import org.onap.policy.drools.pooling.message.Leader;
40 import org.onap.policy.drools.pooling.message.Message;
41 import org.onap.policy.drools.pooling.message.Offline;
42 import org.onap.policy.drools.pooling.state.ActiveState;
43 import org.onap.policy.drools.pooling.state.IdleState;
44 import org.onap.policy.drools.pooling.state.InactiveState;
45 import org.onap.policy.drools.pooling.state.QueryState;
46 import org.onap.policy.drools.pooling.state.StartState;
47 import org.onap.policy.drools.pooling.state.State;
48 import org.onap.policy.drools.pooling.state.StateTimerTask;
49 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
56 * events coming from external topics are saved in a queue for later processing. Once
57 * assignments are made, the saved events are processed. In addition, while the controller
58 * is locked, events are still forwarded to other hosts and bucket assignments are still
59 * updated, based on any {@link Leader} messages that it receives.
61 public class PoolingManagerImpl implements PoolingManager, TopicListener {
63 private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
66 * Maximum number of times a message can be forwarded.
68 public static final int MAX_HOPS = 5;
73 private final String host;
76 * Properties with which this was configured.
78 private final PoolingProperties props;
81 * Associated controller.
83 private final PolicyController controller;
86 * Where to offer events that have been forwarded to this host (i.e, the controller).
88 private final TopicListener listener;
91 * Decremented each time the manager enters the Active state. Used by junit tests.
93 private final CountDownLatch activeLatch;
96 * Used to encode & decode request objects received from & sent to a rule engine.
98 private final Serializer serializer;
101 * Internal DMaaP topic used by this controller.
103 private final String topic;
106 * Manager for the internal DMaaP topic.
108 private final DmaapManager dmaapMgr;
111 * Used to extract the request id from the decoded message.
113 private final ClassExtractors extractors;
116 * Lock used while updating {@link #current}. In general, public methods must use
117 * this, while private methods assume the lock is already held.
119 private final Object curLocker = new Object();
124 * <p>This uses a finite state machine, wherein the state object contains all of the data
125 * relevant to that state. Each state object has a process() method, specific to each
126 * type of {@link Message} subclass. The method returns the next state object, or
127 * {@code null} if the state is to remain the same.
129 private State current;
132 * Current bucket assignments or {@code null}.
134 private BucketAssignments assignments = null;
137 * Pool used to execute timers.
139 private ScheduledThreadPoolExecutor scheduler = null;
142 * {@code True} if events offered by the controller should be intercepted,
143 * {@code false} otherwise.
145 private boolean intercept = true;
148 * Constructs the manager, initializing all of the data structures.
150 * @param host name/uuid of this host
151 * @param controller controller with which this is associated
152 * @param props feature properties specific to the controller
153 * @param activeLatch latch to be decremented each time the manager enters the Active
156 public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
157 CountDownLatch activeLatch) {
159 this.controller = controller;
161 this.activeLatch = activeLatch;
164 this.listener = (TopicListener) controller;
165 this.serializer = new Serializer();
166 this.topic = props.getPoolingTopic();
167 this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
168 this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
169 this.current = new IdleState(this);
171 logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
173 } catch (ClassCastException e) {
174 logger.error("not a topic listener, controller {}", controller.getName());
175 throw new PoolingFeatureRtException(e);
177 } catch (PoolingFeatureException e) {
178 logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
179 throw new PoolingFeatureRtException(e);
184 * Should only be used by junit tests.
186 * @return the current state
188 protected State getCurrent() {
189 synchronized (curLocker) {
195 public String getHost() {
200 public String getTopic() {
205 public PoolingProperties getProperties() {
210 * Makes properties for configuring extractors.
212 * @param controller the controller for which the extractors will be configured
213 * @param source properties from which to get the extractor properties
214 * @return extractor properties
216 private Properties makeExtractorProps(PolicyController controller, Properties source) {
217 return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
221 * Indicates that the controller is about to start. Starts the publisher for the
222 * internal topic, and creates a thread pool for the timers.
224 public void beforeStart() {
225 synchronized (curLocker) {
226 if (scheduler == null) {
227 dmaapMgr.startPublisher();
229 logger.debug("make scheduler thread for topic {}", getTopic());
230 scheduler = makeScheduler();
233 * Only a handful of timers at any moment, thus we can afford to take the
234 * time to remove them when they're cancelled.
236 scheduler.setRemoveOnCancelPolicy(true);
237 scheduler.setMaximumPoolSize(1);
238 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
239 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
245 * Indicates that the controller has successfully started. Starts the consumer for the
246 * internal topic, enters the {@link StartState}, and sets the filter for the initial
249 public void afterStart() {
250 synchronized (curLocker) {
251 if (current instanceof IdleState) {
252 dmaapMgr.startConsumer(this);
253 changeState(new StartState(this));
259 * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
260 * and the current state.
262 public void beforeStop() {
263 ScheduledThreadPoolExecutor sched;
265 synchronized (curLocker) {
269 if (!(current instanceof IdleState)) {
270 changeState(new IdleState(this));
271 dmaapMgr.stopConsumer(this);
272 publishAdmin(new Offline(getHost()));
279 logger.debug("stop scheduler for topic {}", getTopic());
285 * Indicates that the controller has stopped. Stops the publisher and logs a warning
286 * if any events are still in the queue.
288 public void afterStop() {
289 synchronized (curLocker) {
291 * stop the publisher, but allow time for any Offline message to be
294 dmaapMgr.stopPublisher(props.getOfflinePubWaitMs());
299 * Indicates that the controller is about to be locked. Enters the idle state, as all
300 * it will be doing is forwarding messages.
302 public void beforeLock() {
303 logger.info("locking manager for topic {}", getTopic());
305 synchronized (curLocker) {
306 changeState(new IdleState(this));
311 * Indicates that the controller has been unlocked. Enters the start state, if the
312 * controller is running.
314 public void afterUnlock() {
315 logger.info("unlocking manager for topic {}", getTopic());
317 synchronized (curLocker) {
318 if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
319 changeState(new StartState(this));
325 * Changes the finite state machine to a new state, provided the new state is not
328 * @param newState new state, or {@code null} if to remain unchanged
330 private void changeState(State newState) {
331 if (newState != null) {
332 current.cancelTimers();
335 // set the filter before starting the state
336 setFilter(newState.getFilter());
342 * Sets the server-side filter for the internal topic.
344 * @param filter new filter to be used
346 private void setFilter(Map<String, Object> filter) {
348 dmaapMgr.setFilter(serializer.encodeFilter(filter));
350 } catch (JsonParseException e) {
351 logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
353 } catch (PoolingFeatureException e) {
354 logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
359 public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
360 // wrap the task in a TimerAction and schedule it
361 ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
363 // wrap the future in a "CancellableScheduledTask"
364 return () -> fut.cancel(false);
368 public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
369 // wrap the task in a TimerAction and schedule it
370 ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
371 TimeUnit.MILLISECONDS);
373 // wrap the future in a "CancellableScheduledTask"
374 return () -> fut.cancel(false);
378 public void publishAdmin(Message msg) {
379 publish(Message.ADMIN, msg);
383 public void publish(String channel, Message msg) {
384 logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
386 msg.setChannel(channel);
389 // ensure it's valid before we send it
392 String txt = serializer.encodeMsg(msg);
393 dmaapMgr.publish(txt);
395 } catch (JsonParseException e) {
396 logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
398 } catch (PoolingFeatureException e) {
399 logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
404 * Handles an event from the internal topic.
406 * @param commType comm infrastructure
407 * @param topic2 topic
411 public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
414 logger.error("null event on topic {}", topic);
418 synchronized (curLocker) {
419 // it's on the internal topic
420 handleInternal(event);
425 * Called by the PolicyController before it offers the event to the DroolsController.
426 * If the controller is locked, then it isn't processing events. However, they still
427 * need to be forwarded, thus in that case, they are decoded and forwarded.
429 * <p>On the other hand, if the controller is not locked, then we just return immediately
430 * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
431 * it instead, as it already has the decoded message.
433 * @param protocol protocol
434 * @param topic2 topic
436 * @return {@code true} if the event was handled by the manager, {@code false} if it
437 * must still be handled by the invoker
439 public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
441 if (!controller.isLocked() || !intercept) {
442 // we should NOT intercept this message - let the invoker handle it
446 return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
450 * Called by the DroolsController before it inserts the event into the rule engine.
452 * @param protocol protocol
453 * @param topic2 topic
454 * @param event original event text, as received from the Bus
455 * @param event2 event, as an object
456 * @return {@code true} if the event was handled by the manager, {@code false} if it
457 * must still be handled by the invoker
459 public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
462 // we should NOT intercept this message - let the invoker handle it
466 return handleExternal(protocol, topic2, event, extractRequestId(event2));
470 * Handles an event from an external topic.
472 * @param protocol protocol
473 * @param topic2 topic
475 * @param reqid request id extracted from the event, or {@code null} if it couldn't be
477 * @return {@code true} if the event was handled by the manager, {@code false} if it
478 * must still be handled by the invoker
480 private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
482 // no request id - let the invoker handle it
486 if (reqid.isEmpty()) {
487 logger.warn("handle locally due to empty request id for topic {}", topic2);
488 // no request id - let the invoker handle it
492 Forward ev = makeForward(protocol, topic2, event, reqid);
494 // invalid args - consume the message
495 logger.warn("constructed an invalid Forward message on topic {}", getTopic());
499 synchronized (curLocker) {
500 return handleExternal(ev);
505 * Handles an event from an external topic.
508 * @return {@code true} if the event was handled, {@code false} if the invoker should
511 private boolean handleExternal(Forward event) {
512 if (assignments == null) {
513 // no bucket assignments yet - handle locally
514 logger.info("handle event locally for request {}", event.getRequestId());
516 // we did NOT consume the event
520 return handleEvent(event);
525 * Handles a {@link Forward} event, possibly forwarding it again.
528 * @return {@code true} if the event was handled, {@code false} if the invoker should
531 private boolean handleEvent(Forward event) {
532 String target = assignments.getAssignedHost(event.getRequestId().hashCode());
534 if (target == null) {
536 * This bucket has no assignment - just discard the event
538 logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
542 if (target.equals(host)) {
544 * Message belongs to this host - allow the controller to handle it.
546 logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
550 // forward to a different host, if hop count has been exhausted
551 if (event.getNumHops() > MAX_HOPS) {
552 logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
556 logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
558 publish(target, event);
561 // either way, consume the event
566 * Extract the request id from an event object.
568 * @param event the event object, or {@code null}
569 * @return the event's request id, or {@code null} if it can't be extracted
571 private String extractRequestId(Object event) {
576 Object reqid = extractors.extract(event);
577 return (reqid != null ? reqid.toString() : null);
581 * Decodes an event from a String into an event Object.
583 * @param topic2 topic
585 * @return the decoded event object, or {@code null} if it can't be decoded
587 private Object decodeEvent(String topic2, String event) {
588 DroolsController drools = controller.getDrools();
590 // check if this topic has a decoder
592 if (!canDecodeEvent(drools, topic2)) {
594 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
595 drools.getArtifactId());
602 return decodeEventWrapper(drools, topic2, event);
604 } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
605 logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
611 * Makes a {@link Forward}, and validates its contents.
613 * @param protocol protocol
614 * @param topic2 topic
616 * @param reqid request id
617 * @return a new message, or {@code null} if the message was invalid
619 private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
621 Forward ev = new Forward(host, protocol, topic2, event, reqid);
623 // required for the validity check
630 } catch (PoolingFeatureException e) {
631 logger.error("invalid message for topic {}", topic2, e);
637 public void handle(Forward event) {
638 synchronized (curLocker) {
639 if (!handleExternal(event)) {
640 // this host should handle it - inject it
647 * Injects an event into the controller.
651 private void inject(Forward event) {
652 logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
656 listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
664 * Handles an event from the internal topic. This uses reflection to identify the
665 * appropriate process() method to invoke, based on the type of Message that was
668 * @param event the serialized {@link Message} read from the internal topic
670 private void handleInternal(String event) {
671 Class<?> clazz = null;
674 Message msg = serializer.decodeMsg(event);
676 // get the class BEFORE checking the validity
677 clazz = msg.getClass();
681 Method meth = current.getClass().getMethod("process", msg.getClass());
682 changeState((State) meth.invoke(current, msg));
684 } catch (JsonParseException e) {
685 logger.warn("failed to decode message for topic {}", topic, e);
687 } catch (NoSuchMethodException | SecurityException e) {
688 logger.error("no processor for message {} for topic {}", clazz, topic, e);
690 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
691 | PoolingFeatureException e) {
692 logger.error("failed to process message {} for topic {}", clazz, topic, e);
697 public void startDistributing(BucketAssignments asgn) {
698 synchronized (curLocker) {
699 int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
700 logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
706 public BucketAssignments getAssignments() {
711 public State goStart() {
712 return new StartState(this);
716 public State goQuery() {
717 return new QueryState(this);
721 public State goActive() {
722 activeLatch.countDown();
723 return new ActiveState(this);
727 public State goInactive() {
728 return new InactiveState(this);
732 * Action to run a timer task. Only runs the task if the machine is still in the state
733 * that it was in when the timer was created.
735 private class TimerAction implements Runnable {
738 * State of the machine when the timer was created.
740 private State origState;
743 * Task to be executed.
745 private StateTimerTask task;
750 * @param task task to execute when this timer runs
752 public TimerAction(StateTimerTask task) {
753 this.origState = current;
759 synchronized (curLocker) {
760 if (current == origState) {
761 changeState(task.fire());
768 * The remaining methods may be overridden by junit tests.
772 * Creates object extractors.
774 * @param props properties used to configure the extractors
775 * @return a new set of extractors
777 protected ClassExtractors makeClassExtractors(Properties props) {
778 return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
779 PoolingProperties.EXTRACTOR_TYPE);
783 * Creates a DMaaP manager.
785 * @param topic name of the internal DMaaP topic
786 * @return a new DMaaP manager
787 * @throws PoolingFeatureException if an error occurs
789 protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
790 return new DmaapManager(topic);
794 * Creates a scheduled thread pool.
796 * @return a new scheduled thread pool
798 protected ScheduledThreadPoolExecutor makeScheduler() {
799 return new ScheduledThreadPoolExecutor(1);
803 * Determines if the event can be decoded.
805 * @param drools drools controller
806 * @param topic topic on which the event was received
807 * @return {@code true} if the event can be decoded, {@code false} otherwise
809 protected boolean canDecodeEvent(DroolsController drools, String topic) {
810 return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic);
816 * @param drools drools controller
817 * @param topic topic on which the event was received
818 * @param event event text to be decoded
819 * @return the decoded event
820 * @throws IllegalArgumentException illegal argument
821 * @throw UnsupportedOperationException unsupported operation
822 * @throws IllegalStateException illegal state
824 protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
825 return EventProtocolCoder.manager.decode(drools.getGroupId(), drools.getArtifactId(), topic, event);