2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling.state;
23 import java.util.LinkedList;
24 import java.util.List;
25 import org.onap.policy.drools.pooling.CancellableScheduledTask;
26 import org.onap.policy.drools.pooling.PoolingManager;
27 import org.onap.policy.drools.pooling.PoolingProperties;
28 import org.onap.policy.drools.pooling.message.BucketAssignments;
29 import org.onap.policy.drools.pooling.message.Heartbeat;
30 import org.onap.policy.drools.pooling.message.Identification;
31 import org.onap.policy.drools.pooling.message.Leader;
32 import org.onap.policy.drools.pooling.message.Offline;
33 import org.onap.policy.drools.pooling.message.Query;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * A state in the finite state machine.
40 * <p>A state may have several timers associated with it, which must be cancelled whenever
41 * the state is changed. Assumes that timers are not continuously added to the same state.
43 public abstract class State {
45 private static final Logger logger = LoggerFactory.getLogger(State.class);
50 private final PoolingManager mgr;
53 * Timers added by this state.
55 private final List<CancellableScheduledTask> timers = new LinkedList<>();
60 * @param mgr pooling manager
62 protected State(PoolingManager mgr) {
67 * Cancels the timers added by this state.
69 public final void cancelTimers() {
70 timers.forEach(CancellableScheduledTask::cancel);
74 * Starts the state. The default method simply logs a message and returns.
77 logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic());
81 * Transitions to the "start" state.
83 * @return the new state
85 public final State goStart() {
90 * Transitions to the "query" state.
92 * @return the new state
94 public State goQuery() {
99 * Goes active with a new set of assignments.
101 * @param asgn new assignments
102 * @return the new state, either Active or Inactive, depending on whether or not this
103 * host has an assignment
105 protected State goActive(BucketAssignments asgn) {
106 startDistributing(asgn);
108 if (asgn != null && asgn.hasAssignment(getHost())) {
109 return mgr.goActive();
117 * Transitions to the "inactive" state.
119 * @return the new state
121 protected State goInactive() {
122 return mgr.goInactive();
126 * Processes a message. The default method just returns {@code null}.
128 * @param msg message to be processed
129 * @return the new state, or {@code null} if the state is unchanged
131 public State process(Heartbeat msg) {
132 logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic());
137 * Processes a message. The default method just returns {@code null}.
139 * @param msg message to be processed
140 * @return the new state, or {@code null} if the state is unchanged
142 public State process(Identification msg) {
143 logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic());
148 * Processes a message. The default method copies the assignments and then returns
151 * @param msg message to be processed
152 * @return the new state, or {@code null} if the state is unchanged
154 public State process(Leader msg) {
156 logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic());
157 startDistributing(msg.getAssignments());
164 * Processes a message. The default method just returns {@code null}.
166 * @param msg message to be processed
167 * @return the new state, or {@code null} if the state is unchanged
169 public State process(Offline msg) {
170 logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
175 * Processes a message. The default method just returns {@code null}.
177 * @param msg message to be processed
178 * @return the new state, or {@code null} if the state is unchanged
180 public State process(Query msg) {
181 logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
186 * Determines if a message is valid and did not originate from this host.
188 * @param msg message to be validated
189 * @return {@code true} if the message is valid, {@code false} otherwise
191 protected boolean isValid(Leader msg) {
192 BucketAssignments asgn = msg.getAssignments();
194 logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic());
198 // ignore Leader messages from ourself
199 String source = msg.getSource();
200 if (source == null || source.equals(getHost())) {
201 logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic());
205 // the new leader must equal the source
206 boolean result = source.equals(asgn.getLeader());
209 logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic());
216 * Publishes a message.
218 * @param msg message to be published
220 protected final void publish(Identification msg) {
221 mgr.publishAdmin(msg);
225 * Publishes a message.
227 * @param msg message to be published
229 protected final void publish(Leader msg) {
230 mgr.publishAdmin(msg);
234 * Publishes a message.
236 * @param msg message to be published
238 protected final void publish(Offline msg) {
239 mgr.publishAdmin(msg);
243 * Publishes a message.
245 * @param msg message to be published
247 protected final void publish(Query msg) {
248 mgr.publishAdmin(msg);
252 * Publishes a message on the specified channel.
254 * @param channel channel
255 * @param msg message to be published
257 protected final void publish(String channel, Heartbeat msg) {
258 mgr.publish(channel, msg);
262 * Starts distributing messages using the specified bucket assignments.
264 * @param assignments assignments
266 protected final void startDistributing(BucketAssignments assignments) {
267 if (assignments != null) {
268 mgr.startDistributing(assignments);
273 * Schedules a timer to fire after a delay.
275 * @param delayMs delay in ms
278 protected final void schedule(long delayMs, StateTimerTask task) {
279 timers.add(mgr.schedule(delayMs, task));
283 * Schedules a timer to fire repeatedly.
285 * @param initialDelayMs initial delay ms
286 * @param delayMs delay ms
289 protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
290 timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
294 * Indicates that we failed to see our own heartbeat; must be a problem with the
295 * internal topic. Assumes the problem is temporary and continues to use the current
296 * bucket assignments.
298 * @return a new {@link StartState}
300 protected final State missedHeartbeat() {
301 publish(makeOffline());
303 return mgr.goStart();
307 * Indicates that the internal topic failed; this should only be invoked from the
308 * StartState. Discards bucket assignments and begins processing everything locally.
310 * @return a new {@link InactiveState}
312 protected final State internalTopicFailed() {
313 publish(makeOffline());
314 mgr.startDistributing(null);
316 return mgr.goInactive();
320 * Makes a heart beat message.
322 * @param timestampMs time, in milliseconds, associated with the message
324 * @return a new message
326 protected final Heartbeat makeHeartbeat(long timestampMs) {
327 return new Heartbeat(getHost(), timestampMs);
331 * Makes an Identification message.
333 * @return a new message
335 protected Identification makeIdentification() {
336 return new Identification(getHost(), getAssignments());
340 * Makes an "offline" message.
342 * @return a new message
344 protected final Offline makeOffline() {
345 return new Offline(getHost());
349 * Makes a query message.
351 * @return a new message
353 protected final Query makeQuery() {
354 return new Query(getHost());
357 public final BucketAssignments getAssignments() {
358 return mgr.getAssignments();
361 public final String getHost() {
362 return mgr.getHost();
365 public final String getTopic() {
366 return mgr.getTopic();
369 public final PoolingProperties getProperties() {
370 return mgr.getProperties();