Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / main / java / org / onap / policy / drools / pooling / state / ProcessingState.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling.state;
23
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.LinkedList;
27 import java.util.Map;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.SortedSet;
31 import java.util.TreeSet;
32 import lombok.Getter;
33 import lombok.NonNull;
34 import lombok.Setter;
35 import org.onap.policy.drools.pooling.PoolingManager;
36 import org.onap.policy.drools.pooling.message.BucketAssignments;
37 import org.onap.policy.drools.pooling.message.Leader;
38 import org.onap.policy.drools.pooling.message.Query;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Any state in which events are being processed locally and forwarded, as appropriate.
44  */
45 @Setter
46 @Getter
47 public class ProcessingState extends State {
48
49     private static final Logger logger = LoggerFactory.getLogger(ProcessingState.class);
50
51     /**
52      * Current known leader, never {@code null}.
53      */
54     @NonNull
55     private String leader;
56
57     /**
58      * Constructor.
59      *
60      * @param mgr pooling manager
61      * @param leader current known leader, which need not be the same as the assignment
62      *        leader. Never {@code null}
63      * @throws IllegalArgumentException if an argument is invalid
64      */
65     public ProcessingState(PoolingManager mgr, @NonNull String leader) {
66         super(mgr);
67
68         BucketAssignments assignments = mgr.getAssignments();
69
70         if (assignments != null) {
71             String[] arr = assignments.getHostArray();
72             if (arr != null && arr.length == 0) {
73                 throw new IllegalArgumentException("zero-length bucket assignments");
74             }
75         }
76
77         this.leader = leader;
78     }
79
80     /**
81      * Generates an Identification message and goes to the query state.
82      */
83     @Override
84     public State process(Query msg) {
85         logger.info("received Query message on topic {}", getTopic());
86         publish(makeIdentification());
87         return goQuery();
88     }
89
90     /**
91      * Sets the assignments.
92      *
93      * @param assignments new assignments, or {@code null}
94      */
95     protected final void setAssignments(BucketAssignments assignments) {
96         if (assignments != null) {
97             startDistributing(assignments);
98         }
99     }
100
101     /**
102      * Determines if this host is the leader, based on the current assignments.
103      *
104      * @return {@code true} if this host is the leader, {@code false} otherwise
105      */
106     public boolean isLeader() {
107         return getHost().equals(leader);
108     }
109
110     /**
111      * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}.
112      *
113      * @param alive hosts that are known to be alive
114      *
115      * @return the new state
116      */
117     protected State becomeLeader(SortedSet<String> alive) {
118         String newLeader = getHost();
119
120         if (!newLeader.equals(alive.first())) {
121             throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first());
122         }
123
124         var msg = makeLeader(alive);
125         logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size());
126
127         publish(msg);
128
129         return goActive(msg.getAssignments());
130     }
131
132     /**
133      * Makes a leader message. Assumes "this" host is the leader, and thus appears as the
134      * first host in the set of hosts that are still alive.
135      *
136      * @param alive hosts that are known to be alive
137      *
138      * @return a new message
139      */
140     private Leader makeLeader(Set<String> alive) {
141         return new Leader(getHost(), makeAssignments(alive));
142     }
143
144     /**
145      * Makes a set of bucket assignments. Assumes "this" host is the leader.
146      *
147      * @param alive hosts that are known to be alive
148      *
149      * @return a new set of bucket assignments
150      */
151     private BucketAssignments makeAssignments(Set<String> alive) {
152
153         // make a working array from the CURRENT assignments
154         String[] bucket2host = makeBucketArray();
155
156         TreeSet<String> avail = new TreeSet<>(alive);
157
158         // if we have more hosts than buckets, then remove the extra hosts
159         removeExcessHosts(bucket2host.length, avail);
160
161         // create a host bucket for each available host
162         Map<String, HostBucket> host2hb = new HashMap<>();
163         avail.forEach(host -> host2hb.put(host, new HostBucket(host)));
164
165         // add bucket indices to the appropriate host bucket
166         addIndicesToHostBuckets(bucket2host, host2hb);
167
168         // convert the collection back to an array
169         fillArray(host2hb.values(), bucket2host);
170
171         // update bucket2host with new assignments
172         rebalanceBuckets(host2hb.values(), bucket2host);
173
174         return new BucketAssignments(bucket2host);
175     }
176
177     /**
178      * Makes a bucket array, copying the current assignments, if available.
179      *
180      * @return a new bucket array
181      */
182     private String[] makeBucketArray() {
183         BucketAssignments asgn = getAssignments();
184         if (asgn == null) {
185             return new String[BucketAssignments.MAX_BUCKETS];
186         }
187
188         String[] oldArray = asgn.getHostArray();
189         if (oldArray.length == 0) {
190             return new String[BucketAssignments.MAX_BUCKETS];
191         }
192
193         var newArray = new String[oldArray.length];
194         System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
195
196         return newArray;
197     }
198
199     /**
200      * Removes excess hosts from the set of available hosts. Assumes "this" host is the
201      * leader, and thus appears as the first host in the set.
202      *
203      * @param maxHosts maximum number of hosts to be retained
204      * @param avail available hosts
205      */
206     private void removeExcessHosts(int maxHosts, SortedSet<String> avail) {
207         while (avail.size() > maxHosts) {
208             /*
209              * Don't remove this host, as it's the leader. Since the leader is always at
210              * the front of the sorted set, we'll just pick off hosts from the back of the
211              * set.
212              */
213             String host = avail.last();
214             avail.remove(host);
215
216             logger.warn("not using extra host {} for topic {}", host, getTopic());
217         }
218     }
219
220     /**
221      * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or
222      * assigned to a host that does not appear within the map are re-assigned to a host
223      * that appears within the map.
224      *
225      * @param bucket2host bucket assignments
226      * @param host2data maps a host name to its {@link HostBucket}
227      */
228     private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) {
229         LinkedList<Integer> nullBuckets = new LinkedList<>();
230
231         for (var x = 0; x < bucket2host.length; ++x) {
232             String host = bucket2host[x];
233             if (host == null) {
234                 nullBuckets.add(x);
235
236             } else {
237                 HostBucket hb = host2data.get(host);
238                 if (hb == null) {
239                     nullBuckets.add(x);
240
241                 } else {
242                     hb.add(x);
243                 }
244             }
245         }
246
247         // assign the null buckets to other hosts
248         assignNullBuckets(nullBuckets, host2data.values());
249     }
250
251     /**
252      * Assigns null buckets (i.e., those having no assignment) to available hosts.
253      *
254      * @param buckets buckets that still need to be assigned to hosts
255      * @param coll collection of current host-bucket assignments
256      */
257     private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) {
258         // assign null buckets to the hosts with the fewest buckets
259         TreeSet<HostBucket> assignments = new TreeSet<>(coll);
260
261         for (Integer index : buckets) {
262             // add it to the host with the shortest bucket list
263             HostBucket newhb = assignments.pollFirst();
264             assert newhb != null;
265             newhb.add(index);
266
267             // put the item back into the queue, with its new count
268             assignments.add(newhb);
269         }
270     }
271
272     /**
273      * Re-balances the buckets, taking from those that have a larger count and giving to
274      * those that have a smaller count. Populates an output array with the new
275      * assignments.
276      *
277      * @param coll current bucket assignment
278      * @param bucket2host array to be populated with the new assignments
279      */
280     private void rebalanceBuckets(Collection<HostBucket> coll, String[] bucket2host) {
281         if (coll.size() <= 1) {
282             // only one hosts - nothing to rebalance
283             return;
284         }
285
286         TreeSet<HostBucket> assignments = new TreeSet<>(coll);
287
288         for (;;) {
289             HostBucket smaller = assignments.pollFirst();
290             HostBucket larger = assignments.pollLast();
291
292             assert larger != null && smaller != null;
293             if (larger.size() - smaller.size() <= 1) {
294                 // it's as balanced as it will get
295                 break;
296             }
297
298             // move the bucket from the larger to the smaller
299             Integer bucket = larger.remove();
300             smaller.add(bucket);
301
302             bucket2host[bucket] = smaller.host;
303
304             // put the items back, with their new counts
305             assignments.add(larger);
306             assignments.add(smaller);
307         }
308
309     }
310
311     /**
312      * Fills the array with the host assignments.
313      *
314      * @param coll the host assignments
315      * @param bucket2host array to be filled
316      */
317     private void fillArray(Collection<HostBucket> coll, String[] bucket2host) {
318         for (HostBucket hb : coll) {
319             for (Integer index : hb.buckets) {
320                 bucket2host[index] = hb.host;
321             }
322         }
323     }
324
325     /**
326      * Tracks buckets that have been assigned to a host.
327      */
328     protected static class HostBucket implements Comparable<HostBucket> {
329         /**
330          * Host to which the buckets have been assigned.
331          */
332         private String host;
333
334         /**
335          * Buckets that have been assigned to this host.
336          */
337         private Queue<Integer> buckets = new LinkedList<>();
338
339         /**
340          * Constructor.
341          *
342          * @param host host
343          */
344         public HostBucket(String host) {
345             this.host = host;
346         }
347
348         /**
349          * Removes the next bucket from the list.
350          *
351          * @return the next bucket
352          */
353         public final Integer remove() {
354             return buckets.remove();
355         }
356
357         /**
358          * Adds a bucket to the list.
359          *
360          * @param index index of the bucket to add
361          */
362         public final void add(Integer index) {
363             buckets.add(index);
364         }
365
366         /**
367          * Size.
368          *
369          * @return the number of buckets assigned to this host
370          */
371         public final int size() {
372             return buckets.size();
373         }
374
375         /**
376          * Compares host buckets, first by the number of buckets, and then by the host
377          * name.
378          */
379         @Override
380         public final int compareTo(HostBucket other) {
381             int diff = buckets.size() - other.buckets.size();
382             if (diff == 0) {
383                 diff = host.compareTo(other.host);
384             }
385             return diff;
386         }
387
388         @Override
389         public final int hashCode() {
390             throw new UnsupportedOperationException("HostBucket cannot be hashed");
391         }
392
393         @Override
394         public final boolean equals(Object obj) {
395             throw new UnsupportedOperationException("cannot compare HostBuckets");
396         }
397     }
398 }