2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling.state;
23 import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
24 import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
25 import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
27 import java.util.LinkedList;
28 import java.util.List;
30 import org.onap.policy.drools.pooling.CancellableScheduledTask;
31 import org.onap.policy.drools.pooling.PoolingManager;
32 import org.onap.policy.drools.pooling.PoolingProperties;
33 import org.onap.policy.drools.pooling.message.BucketAssignments;
34 import org.onap.policy.drools.pooling.message.Forward;
35 import org.onap.policy.drools.pooling.message.Heartbeat;
36 import org.onap.policy.drools.pooling.message.Identification;
37 import org.onap.policy.drools.pooling.message.Leader;
38 import org.onap.policy.drools.pooling.message.Message;
39 import org.onap.policy.drools.pooling.message.Offline;
40 import org.onap.policy.drools.pooling.message.Query;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * A state in the finite state machine.
47 * <p>A state may have several timers associated with it, which must be cancelled whenever
48 * the state is changed. Assumes that timers are not continuously added to the same state.
50 public abstract class State {
52 private static final Logger logger = LoggerFactory.getLogger(State.class);
57 private final PoolingManager mgr;
60 * Timers added by this state.
62 private final List<CancellableScheduledTask> timers = new LinkedList<>();
67 * @param mgr pooling manager
69 public State(PoolingManager mgr) {
74 * Gets the server-side filter to use when polling the DMaaP internal topic. The
75 * default method returns a filter that accepts messages on the admin channel and on
76 * the host's own channel.
78 * @return the server-side filter to use.
80 @SuppressWarnings("unchecked")
81 public Map<String, Object> getFilter() {
82 return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
86 * Cancels the timers added by this state.
88 public final void cancelTimers() {
89 timers.forEach(timer -> timer.cancel());
93 * Starts the state. The default method simply logs a message and returns.
96 logger.info("entered {} for topic {}", getClass().getSimpleName(), getTopic());
100 * Transitions to the "start" state.
102 * @return the new state
104 public final State goStart() {
105 return mgr.goStart();
109 * Transitions to the "query" state.
111 * @return the new state
113 public State goQuery() {
114 return mgr.goQuery();
118 * Goes active with a new set of assignments.
120 * @param asgn new assignments
121 * @return the new state, either Active or Inactive, depending on whether or not this
122 * host has an assignment
124 protected State goActive(BucketAssignments asgn) {
125 startDistributing(asgn);
127 if (asgn != null && asgn.hasAssignment(getHost())) {
128 return mgr.goActive();
136 * Transitions to the "inactive" state.
138 * @return the new state
140 protected State goInactive() {
141 return mgr.goInactive();
145 * Processes a message. The default method passes it to the manager to handle and
146 * returns {@code null}.
148 * @param msg message to be processed
149 * @return the new state, or {@code null} if the state is unchanged
151 public State process(Forward msg) {
152 if (!getHost().equals(msg.getChannel())) {
153 logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
158 logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
164 * Processes a message. The default method just returns {@code null}.
166 * @param msg message to be processed
167 * @return the new state, or {@code null} if the state is unchanged
169 public State process(Heartbeat msg) {
170 logger.info("ignored heartbeat message from {} on topic {}", msg.getSource(), getTopic());
175 * Processes a message. The default method just returns {@code null}.
177 * @param msg message to be processed
178 * @return the new state, or {@code null} if the state is unchanged
180 public State process(Identification msg) {
181 logger.info("ignored ident message from {} on topic {}", msg.getSource(), getTopic());
186 * Processes a message. The default method copies the assignments and then returns
189 * @param msg message to be processed
190 * @return the new state, or {@code null} if the state is unchanged
192 public State process(Leader msg) {
194 logger.info("extract assignments from Leader message from {} on topic {}", msg.getSource(), getTopic());
195 startDistributing(msg.getAssignments());
202 * Processes a message. The default method just returns {@code null}.
204 * @param msg message to be processed
205 * @return the new state, or {@code null} if the state is unchanged
207 public State process(Offline msg) {
208 logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
213 * Processes a message. The default method just returns {@code null}.
215 * @param msg message to be processed
216 * @return the new state, or {@code null} if the state is unchanged
218 public State process(Query msg) {
219 logger.info("ignored Query message from {} on topic {}", msg.getSource(), getTopic());
224 * Determines if a message is valid and did not originate from this host.
226 * @param msg message to be validated
227 * @return {@code true} if the message is valid, {@code false} otherwise
229 protected boolean isValid(Leader msg) {
230 BucketAssignments asgn = msg.getAssignments();
232 logger.warn("Leader message from {} has no assignments for topic {}", msg.getSource(), getTopic());
236 // ignore Leader messages from ourself
237 String source = msg.getSource();
238 if (source == null || source.equals(getHost())) {
239 logger.debug("ignore Leader message from {} for topic {}", msg.getSource(), getTopic());
243 // the new leader must equal the source
244 boolean result = source.equals(asgn.getLeader());
247 logger.warn("Leader message from {} has an invalid assignment for topic {}", msg.getSource(), getTopic());
254 * Publishes a message.
256 * @param msg message to be published
258 protected final void publish(Identification msg) {
259 mgr.publishAdmin(msg);
263 * Publishes a message.
265 * @param msg message to be published
267 protected final void publish(Leader msg) {
268 mgr.publishAdmin(msg);
272 * Publishes a message.
274 * @param msg message to be published
276 protected final void publish(Offline msg) {
277 mgr.publishAdmin(msg);
281 * Publishes a message.
283 * @param msg message to be published
285 protected final void publish(Query msg) {
286 mgr.publishAdmin(msg);
290 * Publishes a message on the specified channel.
292 * @param channel channel
293 * @param msg message to be published
295 protected final void publish(String channel, Forward msg) {
296 mgr.publish(channel, msg);
300 * Publishes a message on the specified channel.
302 * @param channel channel
303 * @param msg message to be published
305 protected final void publish(String channel, Heartbeat msg) {
306 mgr.publish(channel, msg);
310 * Starts distributing messages using the specified bucket assignments.
312 * @param assignments assignments
314 protected final void startDistributing(BucketAssignments assignments) {
315 if (assignments != null) {
316 mgr.startDistributing(assignments);
321 * Schedules a timer to fire after a delay.
323 * @param delayMs delay in ms
326 protected final void schedule(long delayMs, StateTimerTask task) {
327 timers.add(mgr.schedule(delayMs, task));
331 * Schedules a timer to fire repeatedly.
333 * @param initialDelayMs initial delay ms
334 * @param delayMs delay ms
337 protected final void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
338 timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
342 * Indicates that we failed to see our own heartbeat; must be a problem with the
343 * internal topic. Assumes the problem is temporary and continues to use the current
344 * bucket assignments.
346 * @return a new {@link StartState}
348 protected final State missedHeartbeat() {
349 publish(makeOffline());
351 return mgr.goStart();
355 * Indicates that the internal topic failed; this should only be invoked from the
356 * StartState. Discards bucket assignments and begins processing everything locally.
358 * @return a new {@link InactiveState}
360 protected final State internalTopicFailed() {
361 publish(makeOffline());
362 mgr.startDistributing(null);
364 return mgr.goInactive();
368 * Makes a heart beat message.
370 * @param timestampMs time, in milliseconds, associated with the message
372 * @return a new message
374 protected final Heartbeat makeHeartbeat(long timestampMs) {
375 return new Heartbeat(getHost(), timestampMs);
379 * Makes an Identification message.
381 * @return a new message
383 protected Identification makeIdentification() {
384 return new Identification(getHost(), getAssignments());
388 * Makes an "offline" message.
390 * @return a new message
392 protected final Offline makeOffline() {
393 return new Offline(getHost());
397 * Makes a query message.
399 * @return a new message
401 protected final Query makeQuery() {
402 return new Query(getHost());
405 public final BucketAssignments getAssignments() {
406 return mgr.getAssignments();
409 public final String getHost() {
410 return mgr.getHost();
413 public final String getTopic() {
414 return mgr.getTopic();
417 public final PoolingProperties getProperties() {
418 return mgr.getProperties();