2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling.state;
24 import java.util.LinkedList;
25 import java.util.List;
26 import org.onap.policy.drools.pooling.CancellableScheduledTask;
27 import org.onap.policy.drools.pooling.PoolingManager;
28 import org.onap.policy.drools.pooling.PoolingProperties;
29 import org.onap.policy.drools.pooling.message.BucketAssignments;
30 import org.onap.policy.drools.pooling.message.Heartbeat;
31 import org.onap.policy.drools.pooling.message.Identification;
32 import org.onap.policy.drools.pooling.message.Leader;
33 import org.onap.policy.drools.pooling.message.Offline;
34 import org.onap.policy.drools.pooling.message.Query;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * A state in the finite state machine.
41 * <p>A state may have several timers associated with it, which must be cancelled whenever
42 * the state is changed. Assumes that timers are not continuously added to the same state.
44 public abstract class State {
46 private static final Logger logger = LoggerFactory.getLogger(State.class);
51 private final PoolingManager mgr;
54 * Timers added by this state.
56 private final List<CancellableScheduledTask> timers = new LinkedList<>();
61 * @param mgr pooling manager
63 protected State(PoolingManager mgr) {
68 * Cancels the timers added by this state.
70 public final void cancelTimers() {
71 timers.forEach(CancellableScheduledTask::cancel);
75 * Starts the state. The default method simply logs a message and returns.
78 logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic());
82 * Transitions to the "start" state.
84 * @return the new state
86 public final State goStart() {
91 * Transitions to the "query" state.
93 * @return the new state
95 public State goQuery() {
100 * Goes active with a new set of assignments.
102 * @param asgn new assignments
103 * @return the new state, either Active or Inactive, depending on whether or not this
104 * host has an assignment
106 protected State goActive(BucketAssignments asgn) {
107 startDistributing(asgn);
109 if (asgn != null && asgn.hasAssignment(getHost())) {
110 return mgr.goActive();
118 * Transitions to the "inactive" state.
120 * @return the new state
122 protected State goInactive() {
123 return mgr.goInactive();
127 * Processes a message. The default method just returns {@code null}.
129 * @param msg message to be processed
130 * @return the new state, or {@code null} if the state is unchanged
132 public State process(Heartbeat msg) {
133 logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic());
138 * Processes a message. The default method just returns {@code null}.
140 * @param msg message to be processed
141 * @return the new state, or {@code null} if the state is unchanged
143 public State process(Identification msg) {
144 logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic());
149 * Processes a message. The default method copies the assignments and then returns
152 * @param msg message to be processed
153 * @return the new state, or {@code null} if the state is unchanged
155 public State process(Leader msg) {
157 logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic());
158 startDistributing(msg.getAssignments());
165 * Processes a message. The default method just returns {@code null}.
167 * @param msg message to be processed
168 * @return the new state, or {@code null} if the state is unchanged
170 public State process(Offline msg) {
171 logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
176 * Processes a message. The default method just returns {@code null}.
178 * @param msg message to be processed
179 * @return the new state, or {@code null} if the state is unchanged
181 public State process(Query msg) {
182 logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
187 * Determines if a message is valid and did not originate from this host.
189 * @param msg message to be validated
190 * @return {@code true} if the message is valid, {@code false} otherwise
192 protected boolean isValid(Leader msg) {
193 BucketAssignments asgn = msg.getAssignments();
195 logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic());
199 // ignore Leader messages from ourself
200 String source = msg.getSource();
201 if (source == null || source.equals(getHost())) {
202 logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic());
206 // the new leader must equal the source
207 boolean result = source.equals(asgn.getLeader());
210 logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic());
217 * Publishes a message.
219 * @param msg message to be published
221 protected final void publish(Identification msg) {
222 mgr.publishAdmin(msg);
226 * Publishes a message.
228 * @param msg message to be published
230 protected final void publish(Leader msg) {
231 mgr.publishAdmin(msg);
235 * Publishes a message.
237 * @param msg message to be published
239 protected final void publish(Offline msg) {
240 mgr.publishAdmin(msg);
244 * Publishes a message.
246 * @param msg message to be published
248 protected final void publish(Query msg) {
249 mgr.publishAdmin(msg);
253 * Publishes a message on the specified channel.
255 * @param channel channel
256 * @param msg message to be published
258 protected final void publish(String channel, Heartbeat msg) {
259 mgr.publish(channel, msg);
263 * Starts distributing messages using the specified bucket assignments.
265 * @param assignments assignments
267 protected final void startDistributing(BucketAssignments assignments) {
268 if (assignments != null) {
269 mgr.startDistributing(assignments);
274 * Schedules a timer to fire after a delay.
276 * @param delayMs delay in ms
279 protected final void schedule(long delayMs, StateTimerTask task) {
280 timers.add(mgr.schedule(delayMs, task));
284 * Schedules a timer to fire repeatedly.
286 * @param initialDelayMs initial delay ms
287 * @param delayMs delay ms
290 protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
291 timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
295 * Indicates that we failed to see our own heartbeat; must be a problem with the
296 * internal topic. Assumes the problem is temporary and continues to use the current
297 * bucket assignments.
299 * @return a new {@link StartState}
301 protected final State missedHeartbeat() {
302 publish(makeOffline());
304 return mgr.goStart();
308 * Indicates that the internal topic failed; this should only be invoked from the
309 * StartState. Discards bucket assignments and begins processing everything locally.
311 * @return a new {@link InactiveState}
313 protected final State internalTopicFailed() {
314 publish(makeOffline());
315 mgr.startDistributing(null);
317 return mgr.goInactive();
321 * Makes a heart beat message.
323 * @param timestampMs time, in milliseconds, associated with the message
325 * @return a new message
327 protected final Heartbeat makeHeartbeat(long timestampMs) {
328 return new Heartbeat(getHost(), timestampMs);
332 * Makes an Identification message.
334 * @return a new message
336 protected Identification makeIdentification() {
337 return new Identification(getHost(), getAssignments());
341 * Makes an "offline" message.
343 * @return a new message
345 protected final Offline makeOffline() {
346 return new Offline(getHost());
350 * Makes a query message.
352 * @return a new message
354 protected final Query makeQuery() {
355 return new Query(getHost());
358 public final BucketAssignments getAssignments() {
359 return mgr.getAssignments();
362 public final String getHost() {
363 return mgr.getHost();
366 public final String getTopic() {
367 return mgr.getTopic();
370 public final PoolingProperties getProperties() {
371 return mgr.getProperties();