2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling.state;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.LinkedList;
27 import java.util.Queue;
29 import java.util.SortedSet;
30 import java.util.TreeSet;
32 import lombok.NonNull;
34 import org.onap.policy.drools.pooling.PoolingManager;
35 import org.onap.policy.drools.pooling.message.BucketAssignments;
36 import org.onap.policy.drools.pooling.message.Leader;
37 import org.onap.policy.drools.pooling.message.Query;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * Any state in which events are being processed locally and forwarded, as appropriate.
45 public class ProcessingState extends State {
47 private static final Logger logger = LoggerFactory.getLogger(ProcessingState.class);
50 * Current known leader, never {@code null}.
54 private String leader;
59 * @param mgr pooling manager
60 * @param leader current known leader, which need not be the same as the assignment
61 * leader. Never {@code null}
62 * @throws IllegalArgumentException if an argument is invalid
64 public ProcessingState(PoolingManager mgr, String leader) {
68 throw new IllegalArgumentException("null leader");
71 BucketAssignments assignments = mgr.getAssignments();
73 if (assignments != null) {
74 String[] arr = assignments.getHostArray();
75 if (arr != null && arr.length == 0) {
76 throw new IllegalArgumentException("zero-length bucket assignments");
84 * Generates an Identification message and goes to the query state.
87 public State process(Query msg) {
88 logger.info("received Query message on topic {}", getTopic());
89 publish(makeIdentification());
94 * Sets the assignments.
96 * @param assignments new assignments, or {@code null}
98 protected final void setAssignments(BucketAssignments assignments) {
99 if (assignments != null) {
100 startDistributing(assignments);
105 * Determines if this host is the leader, based on the current assignments.
107 * @return {@code true} if this host is the leader, {@code false} otherwise
109 public boolean isLeader() {
110 return getHost().equals(leader);
114 * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}.
116 * @param alive hosts that are known to be alive
118 * @return the new state
120 protected State becomeLeader(SortedSet<String> alive) {
121 String newLeader = getHost();
123 if (!newLeader.equals(alive.first())) {
124 throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first());
127 Leader msg = makeLeader(alive);
128 logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size());
132 return goActive(msg.getAssignments());
136 * Makes a leader message. Assumes "this" host is the leader, and thus appears as the
137 * first host in the set of hosts that are still alive.
139 * @param alive hosts that are known to be alive
141 * @return a new message
143 private Leader makeLeader(Set<String> alive) {
144 return new Leader(getHost(), makeAssignments(alive));
148 * Makes a set of bucket assignments. Assumes "this" host is the leader.
150 * @param alive hosts that are known to be alive
152 * @return a new set of bucket assignments
154 private BucketAssignments makeAssignments(Set<String> alive) {
156 // make a working array from the CURRENT assignments
157 String[] bucket2host = makeBucketArray();
159 TreeSet<String> avail = new TreeSet<>(alive);
161 // if we have more hosts than buckets, then remove the extra hosts
162 removeExcessHosts(bucket2host.length, avail);
164 // create a host bucket for each available host
165 Map<String, HostBucket> host2hb = new HashMap<>();
166 avail.forEach(host -> host2hb.put(host, new HostBucket(host)));
168 // add bucket indices to the appropriate host bucket
169 addIndicesToHostBuckets(bucket2host, host2hb);
171 // convert the collection back to an array
172 fillArray(host2hb.values(), bucket2host);
174 // update bucket2host with new assignments
175 rebalanceBuckets(host2hb.values(), bucket2host);
177 return new BucketAssignments(bucket2host);
181 * Makes a bucket array, copying the current assignments, if available.
183 * @return a new bucket array
185 private String[] makeBucketArray() {
186 BucketAssignments asgn = getAssignments();
188 return new String[BucketAssignments.MAX_BUCKETS];
191 String[] oldArray = asgn.getHostArray();
192 if (oldArray.length == 0) {
193 return new String[BucketAssignments.MAX_BUCKETS];
196 String[] newArray = new String[oldArray.length];
197 System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
203 * Removes excess hosts from the set of available hosts. Assumes "this" host is the
204 * leader, and thus appears as the first host in the set.
206 * @param maxHosts maximum number of hosts to be retained
207 * @param avail available hosts
209 private void removeExcessHosts(int maxHosts, SortedSet<String> avail) {
210 while (avail.size() > maxHosts) {
212 * Don't remove this host, as it's the leader. Since the leader is always at
213 * the front of the sorted set, we'll just pick off hosts from the back of the
216 String host = avail.last();
219 logger.warn("not using extra host {} for topic {}", host, getTopic());
224 * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or
225 * assigned to a host that does not appear within the map are re-assigned to a host
226 * that appears within the map.
228 * @param bucket2host bucket assignments
229 * @param host2data maps a host name to its {@link HostBucket}
231 private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) {
232 LinkedList<Integer> nullBuckets = new LinkedList<>();
234 for (int x = 0; x < bucket2host.length; ++x) {
235 String host = bucket2host[x];
240 HostBucket hb = host2data.get(host);
250 // assign the null buckets to other hosts
251 assignNullBuckets(nullBuckets, host2data.values());
255 * Assigns null buckets (i.e., those having no assignment) to available hosts.
257 * @param buckets buckets that still need to be assigned to hosts
258 * @param coll collection of current host-bucket assignments
260 private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) {
261 // assign null buckets to the hosts with the fewest buckets
262 TreeSet<HostBucket> assignments = new TreeSet<>(coll);
264 for (Integer index : buckets) {
265 // add it to the host with the shortest bucket list
266 HostBucket newhb = assignments.pollFirst();
269 // put the item back into the queue, with its new count
270 assignments.add(newhb);
275 * Re-balances the buckets, taking from those that have a larger count and giving to
276 * those that have a smaller count. Populates an output array with the new
279 * @param coll current bucket assignment
280 * @param bucket2host array to be populated with the new assignments
282 private void rebalanceBuckets(Collection<HostBucket> coll, String[] bucket2host) {
283 if (coll.size() <= 1) {
284 // only one hosts - nothing to rebalance
288 TreeSet<HostBucket> assignments = new TreeSet<>(coll);
291 HostBucket smaller = assignments.pollFirst();
292 HostBucket larger = assignments.pollLast();
294 if (larger.size() - smaller.size() <= 1) {
295 // it's as balanced as it will get
299 // move the bucket from the larger to the smaller
300 Integer bucket = larger.remove();
303 bucket2host[bucket] = smaller.host;
305 // put the items back, with their new counts
306 assignments.add(larger);
307 assignments.add(smaller);
313 * Fills the array with the host assignments.
315 * @param coll the host assignments
316 * @param bucket2host array to be filled
318 private void fillArray(Collection<HostBucket> coll, String[] bucket2host) {
319 for (HostBucket hb : coll) {
320 for (Integer index : hb.buckets) {
321 bucket2host[index] = hb.host;
327 * Tracks buckets that have been assigned to a host.
329 protected static class HostBucket implements Comparable<HostBucket> {
331 * Host to which the buckets have been assigned.
336 * Buckets that have been assigned to this host.
338 private Queue<Integer> buckets = new LinkedList<>();
345 public HostBucket(String host) {
350 * Removes the next bucket from the list.
352 * @return the next bucket
354 public final Integer remove() {
355 return buckets.remove();
359 * Adds a bucket to the list.
361 * @param index index of the bucket to add
363 public final void add(Integer index) {
370 * @return the number of buckets assigned to this host
372 public final int size() {
373 return buckets.size();
377 * Compares host buckets, first by the number of buckets, and then by the host
381 public final int compareTo(HostBucket other) {
382 int diff = buckets.size() - other.buckets.size();
384 diff = host.compareTo(other.host);
390 public final int hashCode() {
391 throw new UnsupportedOperationException("HostBucket cannot be hashed");
395 public final boolean equals(Object obj) {
396 throw new UnsupportedOperationException("cannot compare HostBuckets");