/*
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
* Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.policy.drools.pooling.state;
import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.PoolingProperties;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Forward;
import org.onap.policy.drools.pooling.message.Heartbeat;
import org.onap.policy.drools.pooling.message.Identification;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
import org.onap.policy.drools.pooling.message.Offline;
import org.onap.policy.drools.pooling.message.Query;
/**
* A state in the finite state machine.
*
* A state may have several timers associated with it, which must be cancelled
* whenever the state is changed. Assumes that timers are not continuously added
* to the same state.
*/
public abstract class State {
/**
* Host pool manager.
*/
private final PoolingManager mgr;
/**
* Timers added by this state.
*/
private final List> timers = new LinkedList<>();
/**
*
* @param mgr
*/
public State(PoolingManager mgr) {
this.mgr = mgr;
}
/**
* Gets the server-side filter to use when polling the DMaaP internal topic.
* The default method returns a filter that accepts messages on the admin
* channel and on the host's own channel.
*
* @return the server-side filter to use.
*/
@SuppressWarnings("unchecked")
public Map getFilter() {
return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
}
/**
* Cancels the timers added by this state.
*/
public void cancelTimers() {
for (ScheduledFuture> fut : timers) {
fut.cancel(false);
}
}
/**
* Starts the state.
*/
public void start() {
}
/**
* Indicates that the finite state machine is stopping. Sends an "offline"
* message to the other hosts.
*/
public void stop() {
publish(makeOffline());
}
/**
* Transitions to the "start" state.
*
* @return the new state
*/
public State goStart() {
return mgr.goStart();
}
/**
* Transitions to the "query" state.
*
* @return the new state
*/
public State goQuery() {
return mgr.goQuery();
}
/**
* Transitions to the "active" state.
*
* @return the new state
*/
public State goActive() {
return mgr.goActive();
}
/**
* Transitions to the "inactive" state.
*
* @return the new state
*/
protected State goInactive() {
return mgr.goInactive();
}
/**
* Processes a message. The default method passes it to the manager to
* handle and returns {@code null}.
*
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Forward msg) {
mgr.handle(msg);
return null;
}
/**
* Processes a message. The default method just returns {@code null}.
*
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Heartbeat msg) {
return null;
}
/**
* Processes a message. The default method just returns {@code null}.
*
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Identification msg) {
return null;
}
/**
* Processes a message. If this host has a new assignment, then it
* transitions to the active state. Otherwise, it transitions to the
* inactive state.
*
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Leader msg) {
BucketAssignments asgn = msg.getAssignments();
if (asgn == null) {
return null;
}
String source = msg.getSource();
if (source == null) {
return null;
}
// the new leader must equal the source
if (source.equals(asgn.getLeader())) {
startDistributing(asgn);
if (asgn.hasAssignment(getHost())) {
return goActive();
} else {
return goInactive();
}
}
return null;
}
/**
* Processes a message. The default method just returns {@code null}.
*
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Offline msg) {
return null;
}
/**
* Processes a message. The default method just returns {@code null}.
*
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Query msg) {
return null;
}
/**
* Publishes a message.
*
* @param msg message to be published
*/
protected void publish(Identification msg) {
mgr.publishAdmin(msg);
}
/**
* Publishes a message.
*
* @param msg message to be published
*/
protected void publish(Leader msg) {
mgr.publishAdmin(msg);
}
/**
* Publishes a message.
*
* @param msg message to be published
*/
protected void publish(Offline msg) {
mgr.publishAdmin(msg);
}
/**
* Publishes a message.
*
* @param msg message to be published
*/
protected void publish(Query msg) {
mgr.publishAdmin(msg);
}
/**
* Publishes a message on the specified channel.
*
* @param channel
* @param msg message to be published
*/
protected void publish(String channel, Forward msg) {
mgr.publish(channel, msg);
}
/**
* Publishes a message on the specified channel.
*
* @param channel
* @param msg message to be published
*/
protected void publish(String channel, Heartbeat msg) {
mgr.publish(channel, msg);
}
/**
* Starts distributing messages using the specified bucket assignments.
*
* @param assignments
*/
protected void startDistributing(BucketAssignments assignments) {
if (assignments != null) {
mgr.startDistributing(assignments);
}
}
/**
* Schedules a timer to fire after a delay.
*
* @param delayMs
* @param task
*/
protected void schedule(long delayMs, StateTimerTask task) {
timers.add(mgr.schedule(delayMs, task));
}
/**
* Schedules a timer to fire repeatedly.
*
* @param initialDelayMs
* @param delayMs
* @param task
*/
protected void scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
timers.add(mgr.scheduleWithFixedDelay(initialDelayMs, delayMs, task));
}
/**
* Indicates that the internal topic failed.
*
* @return a new {@link InactiveState}
*/
protected State internalTopicFailed() {
publish(makeOffline());
mgr.internalTopicFailed();
return mgr.goInactive();
}
/**
* Makes a heart beat message.
*
* @param timestampMs time, in milliseconds, associated with the message
*
* @return a new message
*/
protected Heartbeat makeHeartbeat(long timestampMs) {
return new Heartbeat(getHost(), timestampMs);
}
/**
* Makes an "offline" message.
*
* @return a new message
*/
protected Offline makeOffline() {
return new Offline(getHost());
}
/**
* Makes a query message.
*
* @return a new message
*/
protected Query makeQuery() {
return new Query(getHost());
}
public final BucketAssignments getAssignments() {
return mgr.getAssignments();
}
public final String getHost() {
return mgr.getHost();
}
public final String getTopic() {
return mgr.getTopic();
}
public final PoolingProperties getProperties() {
return mgr.getProperties();
}
}