Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-messages / src / test / java / org / onap / policy / drools / pooling / state / ProcessingStateTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling.state;
23
24 import static org.junit.jupiter.api.Assertions.assertEquals;
25 import static org.junit.jupiter.api.Assertions.assertFalse;
26 import static org.junit.jupiter.api.Assertions.assertNotNull;
27 import static org.junit.jupiter.api.Assertions.assertNotSame;
28 import static org.junit.jupiter.api.Assertions.assertThrows;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
30 import static org.mockito.ArgumentMatchers.any;
31 import static org.mockito.Mockito.mock;
32 import static org.mockito.Mockito.never;
33 import static org.mockito.Mockito.verify;
34 import static org.mockito.Mockito.when;
35
36 import java.util.Arrays;
37 import org.junit.jupiter.api.BeforeEach;
38 import org.junit.jupiter.api.Test;
39 import org.onap.policy.drools.pooling.message.BucketAssignments;
40 import org.onap.policy.drools.pooling.message.Identification;
41 import org.onap.policy.drools.pooling.message.Leader;
42 import org.onap.policy.drools.pooling.message.Message;
43 import org.onap.policy.drools.pooling.message.Query;
44 import org.onap.policy.drools.pooling.state.ProcessingState.HostBucket;
45
46 class ProcessingStateTest extends SupportBasicStateTester {
47
48     private ProcessingState state;
49     private HostBucket hostBucket;
50
51     /**
52      * Setup.
53      */
54     @BeforeEach
55     public void setUp() throws Exception {
56         super.setUp();
57
58         state = new ProcessingState(mgr, MY_HOST);
59         hostBucket = new HostBucket(MY_HOST);
60     }
61
62     @Test
63     void testProcessQuery() {
64         State next = mock(State.class);
65         when(mgr.goQuery()).thenReturn(next);
66
67         assertEquals(next, state.process(new Query()));
68
69         Identification ident = captureAdminMessage(Identification.class);
70         assertEquals(MY_HOST, ident.getSource());
71         assertEquals(ASGN3, ident.getAssignments());
72     }
73
74     @Test
75     void testProcessingState() {
76         /*
77          * Null assignments should be OK.
78          */
79         when(mgr.getAssignments()).thenReturn(null);
80         state = new ProcessingState(mgr, LEADER);
81
82         /*
83          * Empty assignments should be OK.
84          */
85         when(mgr.getAssignments()).thenReturn(EMPTY_ASGN);
86         state = new ProcessingState(mgr, LEADER);
87         assertEquals(MY_HOST, state.getHost());
88         assertEquals(LEADER, state.getLeader());
89         assertEquals(EMPTY_ASGN, state.getAssignments());
90
91         /*
92          * Now try something with assignments.
93          */
94         when(mgr.getAssignments()).thenReturn(ASGN3);
95         state = new ProcessingState(mgr, LEADER);
96
97         /*
98          * Prove the state is attached to the manager by invoking getHost(), which
99          * delegates to the manager.
100          */
101         assertEquals(MY_HOST, state.getHost());
102
103         assertEquals(LEADER, state.getLeader());
104         assertEquals(ASGN3, state.getAssignments());
105     }
106
107     @Test
108     void testProcessingState_NullLeader() {
109         when(mgr.getAssignments()).thenReturn(EMPTY_ASGN);
110         assertThrows(NullPointerException.class, () -> state = new ProcessingState(mgr, null));
111     }
112
113     @Test
114     void testProcessingState_ZeroLengthHostArray() {
115         when(mgr.getAssignments()).thenReturn(new BucketAssignments(new String[] {}));
116         assertThrows(IllegalArgumentException.class, () -> state = new ProcessingState(mgr, LEADER));
117     }
118
119     @Test
120     void testGetAssignments() {
121         // assignments from constructor
122         assertEquals(ASGN3, state.getAssignments());
123
124         // null assignments - no effect
125         state.setAssignments(null);
126         assertEquals(ASGN3, state.getAssignments());
127
128         // empty assignments
129         state.setAssignments(EMPTY_ASGN);
130         assertEquals(EMPTY_ASGN, state.getAssignments());
131
132         // non-empty assignments
133         state.setAssignments(ASGN3);
134         assertEquals(ASGN3, state.getAssignments());
135     }
136
137     @Test
138     void testSetAssignments() {
139         state.setAssignments(null);
140         verify(mgr, never()).startDistributing(any());
141
142         state.setAssignments(ASGN3);
143         verify(mgr).startDistributing(ASGN3);
144     }
145
146     @Test
147     void testGetLeader() {
148         // check value from constructor
149         assertEquals(MY_HOST, state.getLeader());
150
151         state.setLeader(HOST2);
152         assertEquals(HOST2, state.getLeader());
153
154         state.setLeader(HOST3);
155         assertEquals(HOST3, state.getLeader());
156     }
157
158     @Test
159     void testSetLeader() {
160         state.setLeader(MY_HOST);
161         assertEquals(MY_HOST, state.getLeader());
162     }
163
164     @Test
165     void testSetLeader_Null() {
166         assertThrows(NullPointerException.class, () -> state.setLeader(null));
167     }
168
169     @Test
170     void testIsLeader() {
171         state.setLeader(MY_HOST);
172         assertTrue(state.isLeader());
173
174         state.setLeader(HOST2);
175         assertFalse(state.isLeader());
176     }
177
178     @Test
179     void testBecomeLeader() {
180         State next = mock(State.class);
181         when(mgr.goActive()).thenReturn(next);
182
183         assertEquals(next, state.becomeLeader(sortHosts(MY_HOST, HOST2)));
184
185         Leader msg = captureAdminMessage(Leader.class);
186
187         verify(mgr).startDistributing(msg.getAssignments());
188         verify(mgr).goActive();
189     }
190
191     @Test
192     void testBecomeLeader_NotFirstAlive() {
193         // alive list contains something before my host name
194         var sortedHosts = sortHosts(PREV_HOST, MY_HOST);
195         assertThrows(IllegalArgumentException.class, () -> state.becomeLeader(sortedHosts));
196     }
197
198     @Test
199     void testMakeLeader() throws Exception {
200         state.becomeLeader(sortHosts(MY_HOST, HOST2));
201
202         Leader msg = captureAdminMessage(Leader.class);
203
204         // need a channel before invoking checkValidity()
205         msg.setChannel(Message.ADMIN);
206
207         msg.checkValidity();
208
209         assertEquals(MY_HOST, msg.getSource());
210         assertNotNull(msg.getAssignments());
211         assertTrue(msg.getAssignments().hasAssignment(MY_HOST));
212         assertTrue(msg.getAssignments().hasAssignment(HOST2));
213
214         // this one wasn't in the list of hosts, so it should have been removed
215         assertFalse(msg.getAssignments().hasAssignment(HOST1));
216     }
217
218     @Test
219     void testMakeAssignments() throws Exception {
220         state.becomeLeader(sortHosts(MY_HOST, HOST2));
221
222         captureAssignments().checkValidity();
223     }
224
225     @Test
226     void testMakeBucketArray_NullAssignments() {
227         when(mgr.getAssignments()).thenReturn(null);
228         state = new ProcessingState(mgr, MY_HOST);
229         state.becomeLeader(sortHosts(MY_HOST));
230
231         String[] arr = captureHostArray();
232
233         assertEquals(BucketAssignments.MAX_BUCKETS, arr.length);
234
235         assertTrue(Arrays.stream(arr).allMatch(MY_HOST::equals));
236     }
237
238     @Test
239     void testMakeBucketArray_ZeroAssignments() {
240         // bucket assignment with a zero-length array
241         state.setAssignments(new BucketAssignments(new String[0]));
242
243         state.becomeLeader(sortHosts(MY_HOST));
244
245         String[] arr = captureHostArray();
246
247         assertEquals(BucketAssignments.MAX_BUCKETS, arr.length);
248
249         assertTrue(Arrays.stream(arr).allMatch(MY_HOST::equals));
250     }
251
252     @Test
253     void testMakeBucketArray() {
254         /*
255          * All hosts are still alive, so it should have the exact same assignments as it
256          * had to start.
257          */
258         state.setAssignments(ASGN3);
259         state.becomeLeader(sortHosts(HOST_ARR3));
260
261         String[] arr = captureHostArray();
262
263         assertNotSame(HOST_ARR3, arr);
264         assertEquals(Arrays.asList(HOST_ARR3), Arrays.asList(arr));
265     }
266
267     @Test
268     void testRemoveExcessHosts() {
269         /*
270          * All hosts are still alive, plus some others.
271          */
272         state.setAssignments(ASGN3);
273         state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2, HOST3, HOST4));
274
275         // assignments should be unchanged
276         assertEquals(Arrays.asList(HOST_ARR3), captureHostList());
277     }
278
279     @Test
280     void testAddIndicesToHostBuckets() {
281         // some are null, some hosts are no longer alive
282         String[] asgn = {null, MY_HOST, HOST3, null, HOST4, HOST1, HOST2};
283
284         state.setAssignments(new BucketAssignments(asgn));
285         state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2));
286
287         // every bucket should be assigned to one of the three hosts
288         String[] expected = {MY_HOST, MY_HOST, HOST1, HOST2, MY_HOST, HOST1, HOST2};
289         assertEquals(Arrays.asList(expected), captureHostList());
290     }
291
292     @Test
293     void testAssignNullBuckets() {
294         /*
295          * Ensure buckets are assigned to the host with the fewest buckets.
296          */
297         String[] asgn = {MY_HOST, HOST1, MY_HOST, null, null, null, null, null, MY_HOST};
298
299         state.setAssignments(new BucketAssignments(asgn));
300         state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2));
301
302         String[] expected = {MY_HOST, HOST1, MY_HOST, HOST2, HOST1, HOST2, HOST1, HOST2, MY_HOST};
303         assertEquals(Arrays.asList(expected), captureHostList());
304     }
305
306     @Test
307     void testRebalanceBuckets() {
308         /*
309          * Some are very lopsided.
310          */
311         String[] asgn = {MY_HOST, HOST1, MY_HOST, MY_HOST, MY_HOST, MY_HOST, HOST1, HOST2, HOST1, HOST3};
312
313         state.setAssignments(new BucketAssignments(asgn));
314         state.becomeLeader(sortHosts(MY_HOST, HOST1, HOST2, HOST3));
315
316         String[] expected = {HOST2, HOST1, HOST3, MY_HOST, MY_HOST, MY_HOST, HOST1, HOST2, HOST1, HOST3};
317         assertEquals(Arrays.asList(expected), captureHostList());
318     }
319
320     @Test
321     void testHostBucketRemove_testHostBucketAdd_testHostBucketSize() {
322         assertEquals(0, hostBucket.size());
323
324         hostBucket.add(20);
325         hostBucket.add(30);
326         hostBucket.add(40);
327         assertEquals(3, hostBucket.size());
328
329         assertEquals(20, hostBucket.remove().intValue());
330         assertEquals(30, hostBucket.remove().intValue());
331         assertEquals(1, hostBucket.size());
332
333         // add more before taking the last item
334         hostBucket.add(50);
335         hostBucket.add(60);
336         assertEquals(3, hostBucket.size());
337
338         assertEquals(40, hostBucket.remove().intValue());
339         assertEquals(50, hostBucket.remove().intValue());
340         assertEquals(60, hostBucket.remove().intValue());
341         assertEquals(0, hostBucket.size());
342
343         // add more AFTER taking the last item
344         hostBucket.add(70);
345         assertEquals(70, hostBucket.remove().intValue());
346         assertEquals(0, hostBucket.size());
347     }
348
349     @Test
350     void testHostBucketCompareTo() {
351         HostBucket hb1 = new HostBucket(PREV_HOST);
352         HostBucket hb2 = new HostBucket(MY_HOST);
353
354         assertEquals(0, hb1.compareTo(hb1));
355         assertEquals(0, hb1.compareTo(new HostBucket(PREV_HOST)));
356
357         // both empty
358         assertTrue(hb1.compareTo(hb2) < 0);
359         assertTrue(hb2.compareTo(hb1) > 0);
360
361         // hb1 has one bucket, so it should not be larger
362         hb1.add(100);
363         assertTrue(hb1.compareTo(hb2) > 0);
364         assertTrue(hb2.compareTo(hb1) < 0);
365
366         // hb1 has two buckets, so it should still be larger
367         hb1.add(200);
368         assertTrue(hb1.compareTo(hb2) > 0);
369         assertTrue(hb2.compareTo(hb1) < 0);
370
371         // hb1 has two buckets, hb2 has one, so hb1 should still be larger
372         hb2.add(1000);
373         assertTrue(hb1.compareTo(hb2) > 0);
374         assertTrue(hb2.compareTo(hb1) < 0);
375
376         // same number of buckets, so hb2 should now be larger
377         hb2.add(2000);
378         assertTrue(hb1.compareTo(hb2) < 0);
379         assertTrue(hb2.compareTo(hb1) > 0);
380
381         // hb2 has more buckets, it should still be larger
382         hb2.add(3000);
383         assertTrue(hb1.compareTo(hb2) < 0);
384         assertTrue(hb2.compareTo(hb1) > 0);
385     }
386
387     @Test
388     void testHostBucketHashCode() {
389         assertThrows(UnsupportedOperationException.class, () -> hostBucket.hashCode());
390     }
391
392     @Test
393     void testHostBucketEquals() {
394         assertThrows(UnsupportedOperationException.class, () -> hostBucket.equals(hostBucket));
395     }
396 }