2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling.state;
24 import java.util.TreeSet;
25 import org.onap.policy.drools.pooling.PoolingManager;
26 import org.onap.policy.drools.pooling.message.BucketAssignments;
27 import org.onap.policy.drools.pooling.message.Identification;
28 import org.onap.policy.drools.pooling.message.Leader;
29 import org.onap.policy.drools.pooling.message.Offline;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * The Query state. In this state, the host waits for the other hosts to identify
35 * themselves. Eventually, a leader should come forth. If not, it will transition to the
36 * active or inactive state, depending on whether it has an assignment in the
37 * current bucket assignments. The other possibility is that it may <i>become</i> the
38 * leader, in which case it will also transition to the active state.
40 public class QueryState extends ProcessingState {
42 private static final Logger logger = LoggerFactory.getLogger(QueryState.class);
45 * Hosts that have sent an "Identification" message. Always includes this host.
47 private final TreeSet<String> alive = new TreeSet<>();
50 * {@code True} if we saw our own Identification method, {@code false} otherwise.
52 private boolean sawSelfIdent = false;
59 public QueryState(PoolingManager mgr) {
60 // this host is the leader, until a better candidate identifies itself
61 super(mgr, mgr.getHost());
70 // start identification timer
71 awaitIdentification();
75 * Starts a timer to wait for all Identification messages to arrive.
77 private void awaitIdentification() {
80 * Once we've waited long enough for all Identification messages to arrive, become
81 * the leader, assuming we should.
84 schedule(getProperties().getIdentificationMs(), () -> {
87 // didn't see our identification
88 logger.error("missed our own Ident message on topic {}", getTopic());
89 return missedHeartbeat();
91 } else if (isLeader()) {
92 // "this" host is the new leader
93 logger.info("this host is the new leader for topic {}", getTopic());
94 return becomeLeader(alive);
97 // not the leader - return to previous state
98 logger.info("no new leader on topic {}", getTopic());
99 return goActive(getAssignments());
105 public State goQuery() {
110 public State process(Identification msg) {
112 if (getHost().equals(msg.getSource())) {
113 logger.info("saw our own Ident message on topic {}", getTopic());
117 logger.info("received Ident message from {} on topic {}", msg.getSource(), getTopic());
118 recordInfo(msg.getSource(), msg.getAssignments());
125 * If the message leader is better than the leader we have, then go active with it.
126 * Otherwise, simply treat it like an {@link Identification} message.
129 public State process(Leader msg) {
134 String source = msg.getSource();
135 BucketAssignments asgn = msg.getAssignments();
137 // go active, if this has a leader that's the same or better than the one we have
138 if (source.compareTo(getLeader()) <= 0) {
139 logger.warn("leader with {} on topic {}", source, getTopic());
140 return goActive(asgn);
144 * The message does not have an acceptable leader, but we'll still record its
147 logger.info("record leader info from {} on topic {}", source, getTopic());
148 recordInfo(source, asgn);
154 public State process(Offline msg) {
155 String host = msg.getSource();
157 if (host != null && !host.equals(getHost())) {
158 logger.warn("host {} offline on topic {}", host, getTopic());
160 setLeader(alive.first());
163 logger.info("ignored offline message from {} on topic {}", msg.getSource(), getTopic());
170 * Records info from a message, adding the source host name to {@link #alive}, and
171 * updating the bucket assignments.
173 * @param source the message's source host
174 * @param assignments assignments, or {@code null}
176 private void recordInfo(String source, BucketAssignments assignments) {
177 // add this message's source host to "alive"
178 if (source != null) {
180 setLeader(alive.first());
183 if (assignments == null || assignments.getLeader() == null) {
187 // record assignments, if we don't have any yet
188 BucketAssignments current = getAssignments();
189 if (current == null) {
190 logger.info("received initial assignments on topic {}", getTopic());
191 setAssignments(assignments);
196 * Record assignments, if the new assignments have a better (i.e., lesser) leader.
198 String curldr = current.getLeader();
199 if (curldr == null || assignments.getLeader().compareTo(curldr) < 0) {
200 logger.info("use new assignments from {} on topic {}", source, getTopic());
201 setAssignments(assignments);