2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling.state;
24 import java.util.Arrays;
25 import java.util.TreeSet;
26 import lombok.AccessLevel;
28 import org.onap.policy.drools.pooling.PoolingManager;
29 import org.onap.policy.drools.pooling.message.Heartbeat;
30 import org.onap.policy.drools.pooling.message.Leader;
31 import org.onap.policy.drools.pooling.message.Offline;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * The active state. In this state, this host has one more bucket assignments and
37 * processes any events associated with one of its buckets. Other events are forwarded to
38 * appropriate target hosts.
40 @Getter(AccessLevel.PROTECTED)
41 public class ActiveState extends ProcessingState {
43 private static final Logger logger = LoggerFactory.getLogger(ActiveState.class);
46 * Set of hosts that have been assigned a bucket.
48 @Getter(AccessLevel.NONE)
49 private final TreeSet<String> assigned = new TreeSet<>();
52 * Host that comes after this host, or {@code null} if it has no successor.
54 private String succHost = null;
57 * Host that comes before this host, or "" if it has no predecessor.
59 private String predHost = "";
62 * {@code True} if we saw this host's heart beat since the last check, {@code false}
65 private boolean myHeartbeatSeen = false;
68 * {@code True} if we saw the predecessor's heart beat since the last check,
69 * {@code false} otherwise.
71 private boolean predHeartbeatSeen = false;
77 * @param mgr pooling manager
79 public ActiveState(PoolingManager mgr) {
80 super(mgr, mgr.getAssignments().getLeader());
82 assigned.addAll(Arrays.asList(mgr.getAssignments().getHostArray()));
88 * Determine this host's neighbors based on the order of the host UUIDs. Updates
89 * {@link #succHost} and {@link #predHost}.
91 private void detmNeighbors() {
92 if (assigned.size() < 2) {
93 logger.info("this host has no neighbors on topic {}", getTopic());
95 * this host is the only one with any assignments - it has no neighbors
102 if ((succHost = assigned.higher(getHost())) == null) {
103 // wrapped around - successor is the first host in the set
104 succHost = assigned.first();
106 logger.info("this host's successor is {} on topic {}", succHost, getTopic());
108 if ((predHost = assigned.lower(getHost())) == null) {
109 // wrapped around - predecessor is the last host in the set
110 predHost = assigned.last();
112 logger.info("this host's predecessor is {} on topic {}", predHost, getTopic());
116 public void start() {
125 private void addTimers() {
126 logger.info("add timers");
129 * heart beat generator
131 long genMs = getProperties().getInterHeartbeatMs();
133 scheduleWithFixedDelay(genMs, genMs, () -> {
139 * my heart beat checker
141 long waitMs = getProperties().getActiveHeartbeatMs();
143 scheduleWithFixedDelay(waitMs, waitMs, () -> {
144 if (myHeartbeatSeen) {
145 myHeartbeatSeen = false;
149 // missed my heart beat
150 logger.error("missed my heartbeat on topic {}", getTopic());
152 return missedHeartbeat();
156 * predecessor heart beat checker
158 if (!predHost.isEmpty()) {
160 scheduleWithFixedDelay(waitMs, waitMs, () -> {
161 if (predHeartbeatSeen) {
162 predHeartbeatSeen = false;
166 // missed the predecessor's heart beat
167 logger.warn("missed predecessor's heartbeat on topic {}", getTopic());
169 publish(makeQuery());
177 * Generates a heart beat for this host and its successor.
179 private void genHeartbeat() {
180 var msg = makeHeartbeat(System.currentTimeMillis());
181 publish(getHost(), msg);
183 if (succHost != null) {
184 publish(succHost, msg);
189 public State process(Heartbeat msg) {
190 String src = msg.getSource();
193 logger.warn("Heartbeat message has no source on topic {}", getTopic());
195 } else if (src.equals(getHost())) {
196 logger.info("saw my heartbeat on topic {}", getTopic());
197 myHeartbeatSeen = true;
199 } else if (src.equals(predHost)) {
200 logger.info("saw heartbeat from {} on topic {}", src, getTopic());
201 predHeartbeatSeen = true;
204 logger.info("ignored heartbeat message from {} on topic {}", src, getTopic());
211 public State process(Leader msg) {
216 String src = msg.getSource();
218 if (getHost().compareTo(src) < 0) {
219 // our host would be a better leader - find out what's up
220 logger.warn("unexpected Leader message from {} on topic {}", src, getTopic());
224 logger.info("have a new leader {} on topic {}", src, getTopic());
226 return goActive(msg.getAssignments());
230 public State process(Offline msg) {
231 String src = msg.getSource();
234 logger.warn("Offline message has no source on topic {}", getTopic());
237 } else if (!assigned.contains(src)) {
239 * the offline host wasn't assigned any buckets, so just ignore the message
241 logger.info("ignore Offline message from unassigned source {} on topic {}", src, getTopic());
244 } else if (isLeader() || (predHost.equals(src) && predHost.equals(assigned.first()))) {
246 * Case 1: We are the leader.
248 * Case 2: Our predecessor was the leader, and it has gone offline - we should
251 * In either case, we are now the leader, and we must re-balance the buckets
252 * since one of the hosts has gone offline.
255 logger.info("Offline message from source {} on topic {}", src, getTopic());
257 assigned.remove(src);
259 return becomeLeader(assigned);
263 * Otherwise, we don't care right now - we'll wait for the leader to tell us
266 logger.info("ignore Offline message from source {} on topic {}", src, getTopic());