2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling.state;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.LinkedList;
28 import java.util.Queue;
30 import java.util.SortedSet;
31 import java.util.TreeSet;
33 import lombok.NonNull;
35 import org.onap.policy.drools.pooling.PoolingManager;
36 import org.onap.policy.drools.pooling.message.BucketAssignments;
37 import org.onap.policy.drools.pooling.message.Leader;
38 import org.onap.policy.drools.pooling.message.Query;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Any state in which events are being processed locally and forwarded, as appropriate.
47 public class ProcessingState extends State {
49 private static final Logger logger = LoggerFactory.getLogger(ProcessingState.class);
52 * Current known leader, never {@code null}.
55 private String leader;
60 * @param mgr pooling manager
61 * @param leader current known leader, which need not be the same as the assignment
62 * leader. Never {@code null}
63 * @throws IllegalArgumentException if an argument is invalid
65 public ProcessingState(PoolingManager mgr, @NonNull String leader) {
68 BucketAssignments assignments = mgr.getAssignments();
70 if (assignments != null) {
71 String[] arr = assignments.getHostArray();
72 if (arr != null && arr.length == 0) {
73 throw new IllegalArgumentException("zero-length bucket assignments");
81 * Generates an Identification message and goes to the query state.
84 public State process(Query msg) {
85 logger.info("received Query message on topic {}", getTopic());
86 publish(makeIdentification());
91 * Sets the assignments.
93 * @param assignments new assignments, or {@code null}
95 protected final void setAssignments(BucketAssignments assignments) {
96 if (assignments != null) {
97 startDistributing(assignments);
102 * Determines if this host is the leader, based on the current assignments.
104 * @return {@code true} if this host is the leader, {@code false} otherwise
106 public boolean isLeader() {
107 return getHost().equals(leader);
111 * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}.
113 * @param alive hosts that are known to be alive
115 * @return the new state
117 protected State becomeLeader(SortedSet<String> alive) {
118 String newLeader = getHost();
120 if (!newLeader.equals(alive.first())) {
121 throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first());
124 var msg = makeLeader(alive);
125 logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size());
129 return goActive(msg.getAssignments());
133 * Makes a leader message. Assumes "this" host is the leader, and thus appears as the
134 * first host in the set of hosts that are still alive.
136 * @param alive hosts that are known to be alive
138 * @return a new message
140 private Leader makeLeader(Set<String> alive) {
141 return new Leader(getHost(), makeAssignments(alive));
145 * Makes a set of bucket assignments. Assumes "this" host is the leader.
147 * @param alive hosts that are known to be alive
149 * @return a new set of bucket assignments
151 private BucketAssignments makeAssignments(Set<String> alive) {
153 // make a working array from the CURRENT assignments
154 String[] bucket2host = makeBucketArray();
156 TreeSet<String> avail = new TreeSet<>(alive);
158 // if we have more hosts than buckets, then remove the extra hosts
159 removeExcessHosts(bucket2host.length, avail);
161 // create a host bucket for each available host
162 Map<String, HostBucket> host2hb = new HashMap<>();
163 avail.forEach(host -> host2hb.put(host, new HostBucket(host)));
165 // add bucket indices to the appropriate host bucket
166 addIndicesToHostBuckets(bucket2host, host2hb);
168 // convert the collection back to an array
169 fillArray(host2hb.values(), bucket2host);
171 // update bucket2host with new assignments
172 rebalanceBuckets(host2hb.values(), bucket2host);
174 return new BucketAssignments(bucket2host);
178 * Makes a bucket array, copying the current assignments, if available.
180 * @return a new bucket array
182 private String[] makeBucketArray() {
183 BucketAssignments asgn = getAssignments();
185 return new String[BucketAssignments.MAX_BUCKETS];
188 String[] oldArray = asgn.getHostArray();
189 if (oldArray.length == 0) {
190 return new String[BucketAssignments.MAX_BUCKETS];
193 var newArray = new String[oldArray.length];
194 System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
200 * Removes excess hosts from the set of available hosts. Assumes "this" host is the
201 * leader, and thus appears as the first host in the set.
203 * @param maxHosts maximum number of hosts to be retained
204 * @param avail available hosts
206 private void removeExcessHosts(int maxHosts, SortedSet<String> avail) {
207 while (avail.size() > maxHosts) {
209 * Don't remove this host, as it's the leader. Since the leader is always at
210 * the front of the sorted set, we'll just pick off hosts from the back of the
213 String host = avail.last();
216 logger.warn("not using extra host {} for topic {}", host, getTopic());
221 * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or
222 * assigned to a host that does not appear within the map are re-assigned to a host
223 * that appears within the map.
225 * @param bucket2host bucket assignments
226 * @param host2data maps a host name to its {@link HostBucket}
228 private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) {
229 LinkedList<Integer> nullBuckets = new LinkedList<>();
231 for (var x = 0; x < bucket2host.length; ++x) {
232 String host = bucket2host[x];
237 HostBucket hb = host2data.get(host);
247 // assign the null buckets to other hosts
248 assignNullBuckets(nullBuckets, host2data.values());
252 * Assigns null buckets (i.e., those having no assignment) to available hosts.
254 * @param buckets buckets that still need to be assigned to hosts
255 * @param coll collection of current host-bucket assignments
257 private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) {
258 // assign null buckets to the hosts with the fewest buckets
259 TreeSet<HostBucket> assignments = new TreeSet<>(coll);
261 for (Integer index : buckets) {
262 // add it to the host with the shortest bucket list
263 HostBucket newhb = assignments.pollFirst();
264 assert newhb != null;
267 // put the item back into the queue, with its new count
268 assignments.add(newhb);
273 * Re-balances the buckets, taking from those that have a larger count and giving to
274 * those that have a smaller count. Populates an output array with the new
277 * @param coll current bucket assignment
278 * @param bucket2host array to be populated with the new assignments
280 private void rebalanceBuckets(Collection<HostBucket> coll, String[] bucket2host) {
281 if (coll.size() <= 1) {
282 // only one hosts - nothing to rebalance
286 TreeSet<HostBucket> assignments = new TreeSet<>(coll);
289 HostBucket smaller = assignments.pollFirst();
290 HostBucket larger = assignments.pollLast();
292 assert larger != null && smaller != null;
293 if (larger.size() - smaller.size() <= 1) {
294 // it's as balanced as it will get
298 // move the bucket from the larger to the smaller
299 Integer bucket = larger.remove();
302 bucket2host[bucket] = smaller.host;
304 // put the items back, with their new counts
305 assignments.add(larger);
306 assignments.add(smaller);
312 * Fills the array with the host assignments.
314 * @param coll the host assignments
315 * @param bucket2host array to be filled
317 private void fillArray(Collection<HostBucket> coll, String[] bucket2host) {
318 for (HostBucket hb : coll) {
319 for (Integer index : hb.buckets) {
320 bucket2host[index] = hb.host;
326 * Tracks buckets that have been assigned to a host.
328 protected static class HostBucket implements Comparable<HostBucket> {
330 * Host to which the buckets have been assigned.
335 * Buckets that have been assigned to this host.
337 private Queue<Integer> buckets = new LinkedList<>();
344 public HostBucket(String host) {
349 * Removes the next bucket from the list.
351 * @return the next bucket
353 public final Integer remove() {
354 return buckets.remove();
358 * Adds a bucket to the list.
360 * @param index index of the bucket to add
362 public final void add(Integer index) {
369 * @return the number of buckets assigned to this host
371 public final int size() {
372 return buckets.size();
376 * Compares host buckets, first by the number of buckets, and then by the host
380 public final int compareTo(HostBucket other) {
381 int diff = buckets.size() - other.buckets.size();
383 diff = host.compareTo(other.host);
389 public final int hashCode() {
390 throw new UnsupportedOperationException("HostBucket cannot be hashed");
394 public final boolean equals(Object obj) {
395 throw new UnsupportedOperationException("cannot compare HostBuckets");