Use lombok in drools-pdp #3
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / state / ProcessingState.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling.state;
22
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.LinkedList;
26 import java.util.Map;
27 import java.util.Queue;
28 import java.util.Set;
29 import java.util.SortedSet;
30 import java.util.TreeSet;
31 import lombok.Getter;
32 import lombok.NonNull;
33 import lombok.Setter;
34 import org.onap.policy.drools.pooling.PoolingManager;
35 import org.onap.policy.drools.pooling.message.BucketAssignments;
36 import org.onap.policy.drools.pooling.message.Leader;
37 import org.onap.policy.drools.pooling.message.Query;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Any state in which events are being processed locally and forwarded, as appropriate.
43  */
44 @Getter
45 public class ProcessingState extends State {
46
47     private static final Logger logger = LoggerFactory.getLogger(ProcessingState.class);
48
49     /**
50      * Current known leader, never {@code null}.
51      */
52     @NonNull
53     @Setter
54     private String leader;
55
56     /**
57      * Constructor.
58      *
59      * @param mgr pooling manager
60      * @param leader current known leader, which need not be the same as the assignment
61      *        leader. Never {@code null}
62      * @throws IllegalArgumentException if an argument is invalid
63      */
64     public ProcessingState(PoolingManager mgr, String leader) {
65         super(mgr);
66
67         if (leader == null) {
68             throw new IllegalArgumentException("null leader");
69         }
70
71         BucketAssignments assignments = mgr.getAssignments();
72
73         if (assignments != null) {
74             String[] arr = assignments.getHostArray();
75             if (arr != null && arr.length == 0) {
76                 throw new IllegalArgumentException("zero-length bucket assignments");
77             }
78         }
79
80         this.leader = leader;
81     }
82
83     /**
84      * Generates an Identification message and goes to the query state.
85      */
86     @Override
87     public State process(Query msg) {
88         logger.info("received Query message on topic {}", getTopic());
89         publish(makeIdentification());
90         return goQuery();
91     }
92
93     /**
94      * Sets the assignments.
95      *
96      * @param assignments new assignments, or {@code null}
97      */
98     protected final void setAssignments(BucketAssignments assignments) {
99         if (assignments != null) {
100             startDistributing(assignments);
101         }
102     }
103
104     /**
105      * Determines if this host is the leader, based on the current assignments.
106      *
107      * @return {@code true} if this host is the leader, {@code false} otherwise
108      */
109     public boolean isLeader() {
110         return getHost().equals(leader);
111     }
112
113     /**
114      * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}.
115      *
116      * @param alive hosts that are known to be alive
117      *
118      * @return the new state
119      */
120     protected State becomeLeader(SortedSet<String> alive) {
121         String newLeader = getHost();
122
123         if (!newLeader.equals(alive.first())) {
124             throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first());
125         }
126
127         Leader msg = makeLeader(alive);
128         logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size());
129
130         publish(msg);
131
132         return goActive(msg.getAssignments());
133     }
134
135     /**
136      * Makes a leader message. Assumes "this" host is the leader, and thus appears as the
137      * first host in the set of hosts that are still alive.
138      *
139      * @param alive hosts that are known to be alive
140      *
141      * @return a new message
142      */
143     private Leader makeLeader(Set<String> alive) {
144         return new Leader(getHost(), makeAssignments(alive));
145     }
146
147     /**
148      * Makes a set of bucket assignments. Assumes "this" host is the leader.
149      *
150      * @param alive hosts that are known to be alive
151      *
152      * @return a new set of bucket assignments
153      */
154     private BucketAssignments makeAssignments(Set<String> alive) {
155
156         // make a working array from the CURRENT assignments
157         String[] bucket2host = makeBucketArray();
158
159         TreeSet<String> avail = new TreeSet<>(alive);
160
161         // if we have more hosts than buckets, then remove the extra hosts
162         removeExcessHosts(bucket2host.length, avail);
163
164         // create a host bucket for each available host
165         Map<String, HostBucket> host2hb = new HashMap<>();
166         avail.forEach(host -> host2hb.put(host, new HostBucket(host)));
167
168         // add bucket indices to the appropriate host bucket
169         addIndicesToHostBuckets(bucket2host, host2hb);
170
171         // convert the collection back to an array
172         fillArray(host2hb.values(), bucket2host);
173
174         // update bucket2host with new assignments
175         rebalanceBuckets(host2hb.values(), bucket2host);
176
177         return new BucketAssignments(bucket2host);
178     }
179
180     /**
181      * Makes a bucket array, copying the current assignments, if available.
182      *
183      * @return a new bucket array
184      */
185     private String[] makeBucketArray() {
186         BucketAssignments asgn = getAssignments();
187         if (asgn == null) {
188             return new String[BucketAssignments.MAX_BUCKETS];
189         }
190
191         String[] oldArray = asgn.getHostArray();
192         if (oldArray.length == 0) {
193             return new String[BucketAssignments.MAX_BUCKETS];
194         }
195
196         String[] newArray = new String[oldArray.length];
197         System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
198
199         return newArray;
200     }
201
202     /**
203      * Removes excess hosts from the set of available hosts. Assumes "this" host is the
204      * leader, and thus appears as the first host in the set.
205      *
206      * @param maxHosts maximum number of hosts to be retained
207      * @param avail available hosts
208      */
209     private void removeExcessHosts(int maxHosts, SortedSet<String> avail) {
210         while (avail.size() > maxHosts) {
211             /*
212              * Don't remove this host, as it's the leader. Since the leader is always at
213              * the front of the sorted set, we'll just pick off hosts from the back of the
214              * set.
215              */
216             String host = avail.last();
217             avail.remove(host);
218
219             logger.warn("not using extra host {} for topic {}", host, getTopic());
220         }
221     }
222
223     /**
224      * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or
225      * assigned to a host that does not appear within the map are re-assigned to a host
226      * that appears within the map.
227      *
228      * @param bucket2host bucket assignments
229      * @param host2data maps a host name to its {@link HostBucket}
230      */
231     private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) {
232         LinkedList<Integer> nullBuckets = new LinkedList<>();
233
234         for (int x = 0; x < bucket2host.length; ++x) {
235             String host = bucket2host[x];
236             if (host == null) {
237                 nullBuckets.add(x);
238
239             } else {
240                 HostBucket hb = host2data.get(host);
241                 if (hb == null) {
242                     nullBuckets.add(x);
243
244                 } else {
245                     hb.add(x);
246                 }
247             }
248         }
249
250         // assign the null buckets to other hosts
251         assignNullBuckets(nullBuckets, host2data.values());
252     }
253
254     /**
255      * Assigns null buckets (i.e., those having no assignment) to available hosts.
256      *
257      * @param buckets buckets that still need to be assigned to hosts
258      * @param coll collection of current host-bucket assignments
259      */
260     private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) {
261         // assign null buckets to the hosts with the fewest buckets
262         TreeSet<HostBucket> assignments = new TreeSet<>(coll);
263
264         for (Integer index : buckets) {
265             // add it to the host with the shortest bucket list
266             HostBucket newhb = assignments.pollFirst();
267             newhb.add(index);
268
269             // put the item back into the queue, with its new count
270             assignments.add(newhb);
271         }
272     }
273
274     /**
275      * Re-balances the buckets, taking from those that have a larger count and giving to
276      * those that have a smaller count. Populates an output array with the new
277      * assignments.
278      *
279      * @param coll current bucket assignment
280      * @param bucket2host array to be populated with the new assignments
281      */
282     private void rebalanceBuckets(Collection<HostBucket> coll, String[] bucket2host) {
283         if (coll.size() <= 1) {
284             // only one hosts - nothing to rebalance
285             return;
286         }
287
288         TreeSet<HostBucket> assignments = new TreeSet<>(coll);
289
290         for (;;) {
291             HostBucket smaller = assignments.pollFirst();
292             HostBucket larger = assignments.pollLast();
293
294             if (larger.size() - smaller.size() <= 1) {
295                 // it's as balanced as it will get
296                 break;
297             }
298
299             // move the bucket from the larger to the smaller
300             Integer bucket = larger.remove();
301             smaller.add(bucket);
302
303             bucket2host[bucket] = smaller.host;
304
305             // put the items back, with their new counts
306             assignments.add(larger);
307             assignments.add(smaller);
308         }
309
310     }
311
312     /**
313      * Fills the array with the host assignments.
314      *
315      * @param coll the host assignments
316      * @param bucket2host array to be filled
317      */
318     private void fillArray(Collection<HostBucket> coll, String[] bucket2host) {
319         for (HostBucket hb : coll) {
320             for (Integer index : hb.buckets) {
321                 bucket2host[index] = hb.host;
322             }
323         }
324     }
325
326     /**
327      * Tracks buckets that have been assigned to a host.
328      */
329     protected static class HostBucket implements Comparable<HostBucket> {
330         /**
331          * Host to which the buckets have been assigned.
332          */
333         private String host;
334
335         /**
336          * Buckets that have been assigned to this host.
337          */
338         private Queue<Integer> buckets = new LinkedList<>();
339
340         /**
341          * Constructor.
342          *
343          * @param host host
344          */
345         public HostBucket(String host) {
346             this.host = host;
347         }
348
349         /**
350          * Removes the next bucket from the list.
351          *
352          * @return the next bucket
353          */
354         public final Integer remove() {
355             return buckets.remove();
356         }
357
358         /**
359          * Adds a bucket to the list.
360          *
361          * @param index index of the bucket to add
362          */
363         public final void add(Integer index) {
364             buckets.add(index);
365         }
366
367         /**
368          * Size.
369          *
370          * @return the number of buckets assigned to this host
371          */
372         public final int size() {
373             return buckets.size();
374         }
375
376         /**
377          * Compares host buckets, first by the number of buckets, and then by the host
378          * name.
379          */
380         @Override
381         public final int compareTo(HostBucket other) {
382             int diff = buckets.size() - other.buckets.size();
383             if (diff == 0) {
384                 diff = host.compareTo(other.host);
385             }
386             return diff;
387         }
388
389         @Override
390         public final int hashCode() {
391             throw new UnsupportedOperationException("HostBucket cannot be hashed");
392         }
393
394         @Override
395         public final boolean equals(Object obj) {
396             throw new UnsupportedOperationException("cannot compare HostBuckets");
397         }
398     }
399 }