2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling.state;
23 import java.util.Arrays;
24 import java.util.TreeSet;
25 import lombok.AccessLevel;
27 import org.onap.policy.drools.pooling.PoolingManager;
28 import org.onap.policy.drools.pooling.message.Heartbeat;
29 import org.onap.policy.drools.pooling.message.Leader;
30 import org.onap.policy.drools.pooling.message.Offline;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 * The active state. In this state, this host has one more more bucket assignments and
36 * processes any events associated with one of its buckets. Other events are forwarded to
37 * appropriate target hosts.
39 @Getter(AccessLevel.PROTECTED)
40 public class ActiveState extends ProcessingState {
42 private static final Logger logger = LoggerFactory.getLogger(ActiveState.class);
45 * Set of hosts that have been assigned a bucket.
47 @Getter(AccessLevel.NONE)
48 private final TreeSet<String> assigned = new TreeSet<>();
51 * Host that comes after this host, or {@code null} if it has no successor.
53 private String succHost = null;
56 * Host that comes before this host, or "" if it has no predecessor.
58 private String predHost = "";
61 * {@code True} if we saw this host's heart beat since the last check, {@code false}
64 private boolean myHeartbeatSeen = false;
67 * {@code True} if we saw the predecessor's heart beat since the last check,
68 * {@code false} otherwise.
70 private boolean predHeartbeatSeen = false;
76 * @param mgr pooling manager
78 public ActiveState(PoolingManager mgr) {
79 super(mgr, mgr.getAssignments().getLeader());
81 assigned.addAll(Arrays.asList(mgr.getAssignments().getHostArray()));
87 * Determine this host's neighbors based on the order of the host UUIDs. Updates
88 * {@link #succHost} and {@link #predHost}.
90 private void detmNeighbors() {
91 if (assigned.size() < 2) {
92 logger.info("this host has no neighbors on topic {}", getTopic());
94 * this host is the only one with any assignments - it has no neighbors
101 if ((succHost = assigned.higher(getHost())) == null) {
102 // wrapped around - successor is the first host in the set
103 succHost = assigned.first();
105 logger.info("this host's successor is {} on topic {}", succHost, getTopic());
107 if ((predHost = assigned.lower(getHost())) == null) {
108 // wrapped around - predecessor is the last host in the set
109 predHost = assigned.last();
111 logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
115 public void start() {
124 private void addTimers() {
125 logger.info("add timers");
128 * heart beat generator
130 long genMs = getProperties().getInterHeartbeatMs();
132 scheduleWithFixedDelay(genMs, genMs, () -> {
138 * my heart beat checker
140 long waitMs = getProperties().getActiveHeartbeatMs();
142 scheduleWithFixedDelay(waitMs, waitMs, () -> {
143 if (myHeartbeatSeen) {
144 myHeartbeatSeen = false;
148 // missed my heart beat
149 logger.error("missed my heartbeat on topic {}", getTopic());
151 return missedHeartbeat();
155 * predecessor heart beat checker
157 if (!predHost.isEmpty()) {
159 scheduleWithFixedDelay(waitMs, waitMs, () -> {
160 if (predHeartbeatSeen) {
161 predHeartbeatSeen = false;
165 // missed the predecessor's heart beat
166 logger.warn("missed predecessor's heartbeat on topic {}", getTopic());
168 publish(makeQuery());
176 * Generates a heart beat for this host and its successor.
178 private void genHeartbeat() {
179 var msg = makeHeartbeat(System.currentTimeMillis());
180 publish(getHost(), msg);
182 if (succHost != null) {
183 publish(succHost, msg);
188 public State process(Heartbeat msg) {
189 String src = msg.getSource();
192 logger.warn("Heartbeat message has no source on topic {}", getTopic());
194 } else if (src.equals(getHost())) {
195 logger.info("saw my heartbeat on topic {}", getTopic());
196 myHeartbeatSeen = true;
198 } else if (src.equals(predHost)) {
199 logger.info("saw heartbeat from {} on topic {}", src, getTopic());
200 predHeartbeatSeen = true;
203 logger.info("ignored heartbeat message from {} on topic {}", src, getTopic());
210 public State process(Leader msg) {
215 String src = msg.getSource();
217 if (getHost().compareTo(src) < 0) {
218 // our host would be a better leader - find out what's up
219 logger.warn("unexpected Leader message from {} on topic {}", src, getTopic());
223 logger.info("have a new leader {} on topic {}", src, getTopic());
225 return goActive(msg.getAssignments());
229 public State process(Offline msg) {
230 String src = msg.getSource();
233 logger.warn("Offline message has no source on topic {}", getTopic());
236 } else if (!assigned.contains(src)) {
238 * the offline host wasn't assigned any buckets, so just ignore the message
240 logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic());
243 } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) {
245 * Case 1: We are the leader.
247 * Case 2: Our predecessor was the leader and it has gone offline - we should
250 * In either case, we are now the leader and we must re-balance the buckets
251 * since one of the hosts has gone offline.
254 logger.info("Offline message from source {} on topic {}", src, getTopic());
256 assigned.remove(src);
258 return becomeLeader(assigned);
262 * Otherwise, we don't care right now - we'll wait for the leader to tell us
265 logger.info("ignore Offline message from source {} on topic {}", src, getTopic());