Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / test / java / org / onap / policy / drools / pooling / state / QueryStateTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2020, 2024 Nordix Foundation
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling.state;
23
24 import static org.junit.jupiter.api.Assertions.assertEquals;
25 import static org.junit.jupiter.api.Assertions.assertNotNull;
26 import static org.junit.jupiter.api.Assertions.assertNull;
27 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import org.apache.commons.lang3.tuple.Pair;
36 import org.junit.jupiter.api.BeforeEach;
37 import org.junit.jupiter.api.Test;
38 import org.onap.policy.drools.pooling.message.BucketAssignments;
39 import org.onap.policy.drools.pooling.message.Identification;
40 import org.onap.policy.drools.pooling.message.Leader;
41 import org.onap.policy.drools.pooling.message.Offline;
42
43 class QueryStateTest extends SupportBasicStateTester {
44
45     private QueryState state;
46
47     /**
48      * Setup.
49      */
50     @Override
51     @BeforeEach
52     public void setUp() throws Exception {
53         super.setUp();
54
55         state = new QueryState(mgr);
56     }
57
58     @Test
59     void testGoQuery() {
60         assertNull(state.goQuery());
61     }
62
63     @Test
64     void testStart() {
65         state.start();
66
67         Pair<Long, StateTimerTask> timer = onceTasks.remove();
68
69         assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
70         assertNotNull(timer.getRight());
71     }
72
73     @Test
74     void testProcessIdentification_SameSource() {
75         String[] arr = {HOST2, PREV_HOST, MY_HOST};
76         BucketAssignments asgn = new BucketAssignments(arr);
77
78         assertNull(state.process(new Identification(MY_HOST, asgn)));
79
80         // info should be unchanged
81         assertEquals(MY_HOST, state.getLeader());
82         verify(mgr, never()).startDistributing(asgn);
83     }
84
85     @Test
86     void testProcessIdentification_DiffSource() {
87         String[] arr = {HOST2, PREV_HOST, MY_HOST};
88         BucketAssignments asgn = new BucketAssignments(arr);
89
90         assertNull(state.process(new Identification(HOST2, asgn)));
91
92         // leader should be unchanged
93         assertEquals(MY_HOST, state.getLeader());
94
95         // should have picked up the assignments
96         verify(mgr).startDistributing(asgn);
97     }
98
99     @Test
100     void testProcessLeader_Invalid() {
101         Leader msg = new Leader(PREV_HOST, null);
102
103         // should stay in the same state, and not start distributing
104         assertNull(state.process(msg));
105         verify(mgr, never()).startDistributing(any());
106         verify(mgr, never()).goActive();
107         verify(mgr, never()).goInactive();
108
109         // info should be unchanged
110         assertEquals(MY_HOST, state.getLeader());
111         assertEquals(ASGN3, state.getAssignments());
112     }
113
114     @Test
115     void testProcessLeader_SameLeader() {
116         String[] arr = {HOST2, PREV_HOST, MY_HOST};
117         BucketAssignments asgn = new BucketAssignments(arr);
118
119         // identify a leader that's better than my host
120         assertNull(state.process(new Identification(PREV_HOST, asgn)));
121
122         // now send a Leader message for that leader
123         Leader msg = new Leader(PREV_HOST, asgn);
124
125         State next = mock(State.class);
126         when(mgr.goActive()).thenReturn(next);
127
128         // should go Active and start distributing
129         assertEquals(next, state.process(msg));
130         verify(mgr, never()).goInactive();
131
132         // Ident msg + Leader msg = times(2)
133         verify(mgr, times(2)).startDistributing(asgn);
134     }
135
136     @Test
137     void testProcessLeader_BetterLeaderWithAssignment() {
138         String[] arr = {HOST2, PREV_HOST, MY_HOST};
139         BucketAssignments asgn = new BucketAssignments(arr);
140         Leader msg = new Leader(PREV_HOST, asgn);
141
142         State next = mock(State.class);
143         when(mgr.goActive()).thenReturn(next);
144
145         // should go Active and start distributing
146         assertEquals(next, state.process(msg));
147         verify(mgr).startDistributing(asgn);
148         verify(mgr, never()).goInactive();
149     }
150
151     @Test
152     void testProcessLeader_BetterLeaderWithoutAssignment() {
153         String[] arr = {HOST2, PREV_HOST, HOST1};
154         BucketAssignments asgn = new BucketAssignments(arr);
155         Leader msg = new Leader(PREV_HOST, asgn);
156
157         State next = mock(State.class);
158         when(mgr.goInactive()).thenReturn(next);
159
160         // should go Inactive, but start distributing
161         assertEquals(next, state.process(msg));
162         verify(mgr).startDistributing(asgn);
163         verify(mgr, never()).goActive();
164     }
165
166     @Test
167     void testProcessLeader_NotABetterLeader() {
168         // no assignments yet
169         mgr.startDistributing(null);
170         state = new QueryState(mgr);
171
172         BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
173         Leader msg = new Leader(HOST1, asgn);
174
175         State next = mock(State.class);
176         when(mgr.goInactive()).thenReturn(next);
177
178         // should stay in the same state
179         assertNull(state.process(msg));
180         verify(mgr, never()).goActive();
181         verify(mgr, never()).goInactive();
182
183         // should have started distributing
184         verify(mgr).startDistributing(asgn);
185
186         // this host should still be the leader
187         assertEquals(MY_HOST, state.getLeader());
188
189         // new assignments
190         assertEquals(asgn, state.getAssignments());
191     }
192
193     @Test
194     void testProcessOffline_NullHost() {
195         assertNull(state.process(new Offline()));
196         assertEquals(MY_HOST, state.getLeader());
197     }
198
199     @Test
200     void testProcessOffline_SameHost() {
201         assertNull(state.process(new Offline(MY_HOST)));
202         assertEquals(MY_HOST, state.getLeader());
203     }
204
205     @Test
206     void testProcessOffline_DiffHost() {
207         BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, HOST1});
208         mgr.startDistributing(asgn);
209         state = new QueryState(mgr);
210
211         // tell it that the hosts are alive
212         state.process(new Identification(PREV_HOST, asgn));
213         state.process(new Identification(HOST1, asgn));
214
215         // #2 goes offline
216         assertNull(state.process(new Offline(HOST1)));
217
218         // #1 should still be the leader
219         assertEquals(PREV_HOST, state.getLeader());
220
221         // #1 goes offline
222         assertNull(state.process(new Offline(PREV_HOST)));
223
224         // this should still be the leader now
225         assertEquals(MY_HOST, state.getLeader());
226     }
227
228     @Test
229     void testQueryState() {
230         /*
231          * Prove the state is attached to the manager by invoking getHost(), which
232          * delegates to the manager.
233          */
234         assertEquals(MY_HOST, state.getHost());
235     }
236
237     @Test
238     void testAwaitIdentification_MissingSelfIdent() {
239         state.start();
240
241         Pair<Long, StateTimerTask> timer = onceTasks.remove();
242
243         assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
244         assertNotNull(timer.getRight());
245
246         // should published an Offline message and go inactive
247
248         State next = mock(State.class);
249         when(mgr.goStart()).thenReturn(next);
250
251         assertEquals(next, timer.getRight().fire());
252
253         // should continue distributing
254         verify(mgr, never()).startDistributing(null);
255
256         Offline msg = captureAdminMessage(Offline.class);
257         assertEquals(MY_HOST, msg.getSource());
258     }
259
260     @Test
261     void testAwaitIdentification_Leader() {
262         state.start();
263         state.process(new Identification(MY_HOST, null));
264
265         Pair<Long, StateTimerTask> timer = onceTasks.remove();
266
267         assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
268         assertNotNull(timer.getRight());
269
270         State next = mock(State.class);
271         when(mgr.goActive()).thenReturn(next);
272
273         assertEquals(next, timer.getRight().fire());
274
275         // should have published a Leader message
276         Leader msg = captureAdminMessage(Leader.class);
277         assertEquals(MY_HOST, msg.getSource());
278         assertTrue(msg.getAssignments().hasAssignment(MY_HOST));
279     }
280
281     @Test
282     void testAwaitIdentification_HasAssignment() {
283         // not the leader, but has an assignment
284         BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
285         mgr.startDistributing(asgn);
286         state = new QueryState(mgr);
287
288         state.start();
289         state.process(new Identification(MY_HOST, null));
290
291         // tell it the leader is still active
292         state.process(new Identification(PREV_HOST, asgn));
293
294         Pair<Long, StateTimerTask> timer = onceTasks.remove();
295
296         assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
297         assertNotNull(timer.getRight());
298
299         // set up active state, as that's what it should return
300         State next = mock(State.class);
301         when(mgr.goActive()).thenReturn(next);
302
303         assertEquals(next, timer.getRight().fire());
304
305         // should NOT have published a Leader message
306         assertTrue(admin.isEmpty());
307
308         // should have gone active with the current assignments
309         verify(mgr).goActive();
310     }
311
312     @Test
313     void testAwaitIdentification_NoAssignment() {
314         // not the leader and no assignment
315         BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
316         mgr.startDistributing(asgn);
317         state = new QueryState(mgr);
318
319         state.start();
320         state.process(new Identification(MY_HOST, null));
321
322         // tell it the leader is still active
323         state.process(new Identification(PREV_HOST, asgn));
324
325         Pair<Long, StateTimerTask> timer = onceTasks.remove();
326
327         assertEquals(STD_IDENTIFICATION_MS, timer.getLeft().longValue());
328         assertNotNull(timer.getRight());
329
330         // set up inactive state, as that's what it should return
331         State next = mock(State.class);
332         when(mgr.goInactive()).thenReturn(next);
333
334         assertEquals(next, timer.getRight().fire());
335
336         // should NOT have published a Leader message
337         assertTrue(admin.isEmpty());
338     }
339
340     @Test
341     void testRecordInfo_NullSource() {
342         state.setAssignments(ASGN3);
343         state.setLeader(MY_HOST);
344
345         BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
346         state.process(new Identification(null, asgn));
347
348         // leader unchanged
349         assertEquals(MY_HOST, state.getLeader());
350
351         // assignments still updated
352         assertEquals(asgn, state.getAssignments());
353     }
354
355     @Test
356     void testRecordInfo_SourcePreceedsMyHost() {
357         state.setAssignments(ASGN3);
358         state.setLeader(MY_HOST);
359
360         BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
361         state.process(new Identification(PREV_HOST, asgn));
362
363         // new leader
364         assertEquals(PREV_HOST, state.getLeader());
365
366         // assignments still updated
367         assertEquals(asgn, state.getAssignments());
368     }
369
370     @Test
371     void testRecordInfo_SourceFollowsMyHost() {
372         mgr.startDistributing(null);
373         state.setLeader(MY_HOST);
374
375         BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
376         state.process(new Identification(HOST1, asgn));
377
378         // leader unchanged
379         assertEquals(MY_HOST, state.getLeader());
380
381         // assignments still updated
382         assertEquals(asgn, state.getAssignments());
383     }
384
385     @Test
386     void testRecordInfo_NewIsNull() {
387         state.setAssignments(ASGN3);
388         state.process(new Identification(HOST1, null));
389
390         assertEquals(ASGN3, state.getAssignments());
391     }
392
393     @Test
394     void testRecordInfo_NewIsEmpty() {
395         state.setAssignments(ASGN3);
396         state.process(new Identification(PREV_HOST, new BucketAssignments()));
397
398         assertEquals(ASGN3, state.getAssignments());
399     }
400
401     @Test
402     void testRecordInfo_OldIsNull() {
403         mgr.startDistributing(null);
404
405         BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
406         state.process(new Identification(HOST1, asgn));
407
408         assertEquals(asgn, state.getAssignments());
409     }
410
411     @Test
412     void testRecordInfo_OldIsEmpty() {
413         state.setAssignments(new BucketAssignments());
414
415         BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
416         state.process(new Identification(HOST1, asgn));
417
418         assertEquals(asgn, state.getAssignments());
419     }
420
421     @Test
422     void testRecordInfo_NewLeaderPreceedsOld() {
423         state.setAssignments(ASGN3);
424         state.setLeader(MY_HOST);
425
426         BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
427         state.process(new Identification(HOST3, asgn));
428
429         assertEquals(asgn, state.getAssignments());
430     }
431
432     @Test
433     void testRecordInfo_NewLeaderSucceedsOld() {
434         state.setAssignments(ASGN3);
435         state.setLeader(MY_HOST);
436
437         BucketAssignments asgn = new BucketAssignments(new String[] {HOST2, HOST3});
438         state.process(new Identification(HOST3, asgn));
439
440         // should be unchanged
441         assertEquals(ASGN3, state.getAssignments());
442     }
443
444 }