Close old UEB/DMaaP consumer
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / state / QueryStateTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling.state;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34 import java.util.Map;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.onap.policy.drools.pooling.message.BucketAssignments;
38 import org.onap.policy.drools.pooling.message.Identification;
39 import org.onap.policy.drools.pooling.message.Leader;
40 import org.onap.policy.drools.pooling.message.Message;
41 import org.onap.policy.drools.pooling.message.Offline;
42 import org.onap.policy.drools.utils.Pair;
43
44 public class QueryStateTest extends BasicStateTester {
45
46     private QueryState state;
47
48     @Before
49     public void setUp() throws Exception {
50         super.setUp();
51
52         state = new QueryState(mgr);
53     }
54
55     @Test
56     public void testGetFilter() {
57         Map<String, Object> filter = state.getFilter();
58
59         FilterUtilsTest utils = new FilterUtilsTest();
60
61         utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
62         utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
63         utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
64     }
65
66     @Test
67     public void testGoQuery() {
68         assertNull(state.goQuery());
69     }
70
71     @Test
72     public void testStart() {
73         state.start();
74
75         Pair<Long, StateTimerTask> timer = onceTasks.remove();
76
77         assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
78         assertNotNull(timer.second());
79     }
80
81     @Test
82     public void testProcessIdentification_SameSource() {
83         String[] arr = {HOST2, PREV_HOST, MY_HOST};
84         BucketAssignments asgn = new BucketAssignments(arr);
85
86         assertNull(state.process(new Identification(MY_HOST, asgn)));
87
88         // info should be unchanged
89         assertEquals(MY_HOST, state.getLeader());
90         verify(mgr, never()).startDistributing(asgn);
91     }
92
93     @Test
94     public void testProcessIdentification_DiffSource() {
95         String[] arr = {HOST2, PREV_HOST, MY_HOST};
96         BucketAssignments asgn = new BucketAssignments(arr);
97
98         assertNull(state.process(new Identification(HOST2, asgn)));
99
100         // leader should be unchanged
101         assertEquals(MY_HOST, state.getLeader());
102
103         // should have picked up the assignments
104         verify(mgr).startDistributing(asgn);
105     }
106
107     @Test
108     public void testProcessLeader_Invalid() {
109         Leader msg = new Leader(PREV_HOST, null);
110
111         // should stay in the same state, and not start distributing
112         assertNull(state.process(msg));
113         verify(mgr, never()).startDistributing(any());
114         verify(mgr, never()).goActive();
115         verify(mgr, never()).goInactive();
116
117         // info should be unchanged
118         assertEquals(MY_HOST, state.getLeader());
119         assertEquals(ASGN3, state.getAssignments());
120     }
121
122     @Test
123     public void testProcessLeader_SameLeader() {
124         String[] arr = {HOST2, PREV_HOST, MY_HOST};
125         BucketAssignments asgn = new BucketAssignments(arr);
126
127         // identify a leader that's better than my host
128         assertEquals(null, state.process(new Identification(PREV_HOST, asgn)));
129
130         // now send a Leader message for that leader
131         Leader msg = new Leader(PREV_HOST, asgn);
132
133         State next = mock(State.class);
134         when(mgr.goActive()).thenReturn(next);
135
136         // should go Active and start distributing
137         assertEquals(next, state.process(msg));
138         verify(mgr, never()).goInactive();
139
140         // Ident msg + Leader msg = times(2)
141         verify(mgr, times(2)).startDistributing(asgn);
142     }
143
144     @Test
145     public void testProcessLeader_BetterLeaderWithAssignment() {
146         String[] arr = {HOST2, PREV_HOST, MY_HOST};
147         BucketAssignments asgn = new BucketAssignments(arr);
148         Leader msg = new Leader(PREV_HOST, asgn);
149
150         State next = mock(State.class);
151         when(mgr.goActive()).thenReturn(next);
152
153         // should go Active and start distributing
154         assertEquals(next, state.process(msg));
155         verify(mgr).startDistributing(asgn);
156         verify(mgr, never()).goInactive();
157     }
158
159     @Test
160     public void testProcessLeader_BetterLeaderWithoutAssignment() {
161         String[] arr = {HOST2, PREV_HOST, HOST1};
162         BucketAssignments asgn = new BucketAssignments(arr);
163         Leader msg = new Leader(PREV_HOST, asgn);
164
165         State next = mock(State.class);
166         when(mgr.goInactive()).thenReturn(next);
167
168         // should go Inactive, but start distributing
169         assertEquals(next, state.process(msg));
170         verify(mgr).startDistributing(asgn);
171         verify(mgr, never()).goActive();
172     }
173
174     @Test
175     public void testProcessLeader_NotABetterLeader() {
176         // no assignments yet
177         mgr.startDistributing(null);
178         state = new QueryState(mgr);
179
180         BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
181         Leader msg = new Leader(HOST1, asgn);
182
183         State next = mock(State.class);
184         when(mgr.goInactive()).thenReturn(next);
185
186         // should stay in the same state
187         assertNull(state.process(msg));
188         verify(mgr, never()).goActive();
189         verify(mgr, never()).goInactive();
190
191         // should have started distributing
192         verify(mgr).startDistributing(asgn);
193
194         // this host should still be the leader
195         assertEquals(MY_HOST, state.getLeader());
196
197         // new assignments
198         assertEquals(asgn, state.getAssignments());
199     }
200
201     @Test
202     public void testProcessOffline_NullHost() {
203         assertNull(state.process(new Offline()));
204         assertEquals(MY_HOST, state.getLeader());
205     }
206
207     @Test
208     public void testProcessOffline_SameHost() {
209         assertNull(state.process(new Offline(MY_HOST)));
210         assertEquals(MY_HOST, state.getLeader());
211     }
212
213     @Test
214     public void testProcessOffline_DiffHost() {
215         BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, HOST1});
216         mgr.startDistributing(asgn);
217         state = new QueryState(mgr);
218
219         // tell it that the hosts are alive
220         state.process(new Identification(PREV_HOST, asgn));
221         state.process(new Identification(HOST1, asgn));
222
223         // #2 goes offline
224         assertNull(state.process(new Offline(HOST1)));
225
226         // #1 should still be the leader
227         assertEquals(PREV_HOST, state.getLeader());
228
229         // #1 goes offline
230         assertNull(state.process(new Offline(PREV_HOST)));
231
232         // this should still be the leader now
233         assertEquals(MY_HOST, state.getLeader());
234     }
235
236     @Test
237     public void testQueryState() {
238         /*
239          * Prove the state is attached to the manager by invoking getHost(), which
240          * delegates to the manager.
241          */
242         assertEquals(MY_HOST, state.getHost());
243     }
244
245     @Test
246     public void testAwaitIdentification_MissingSelfIdent() {
247         state.start();
248
249         Pair<Long, StateTimerTask> timer = onceTasks.remove();
250
251         assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
252         assertNotNull(timer.second());
253
254         // should published an Offline message and go inactive
255
256         State next = mock(State.class);
257         when(mgr.goInactive()).thenReturn(next);
258
259         assertEquals(next, timer.second().fire());
260
261         Offline msg = captureAdminMessage(Offline.class);
262         assertEquals(MY_HOST, msg.getSource());
263     }
264
265     @Test
266     public void testAwaitIdentification_Leader() {
267         state.start();
268         state.process(new Identification(MY_HOST, null));
269
270         Pair<Long, StateTimerTask> timer = onceTasks.remove();
271
272         assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
273         assertNotNull(timer.second());
274
275         State next = mock(State.class);
276         when(mgr.goActive()).thenReturn(next);
277
278         assertEquals(next, timer.second().fire());
279
280         // should have published a Leader message
281         Leader msg = captureAdminMessage(Leader.class);
282         assertEquals(MY_HOST, msg.getSource());
283         assertTrue(msg.getAssignments().hasAssignment(MY_HOST));
284     }
285
286     @Test
287     public void testAwaitIdentification_HasAssignment() {
288         // not the leader, but has an assignment
289         BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
290         mgr.startDistributing(asgn);
291         state = new QueryState(mgr);
292
293         state.start();
294         state.process(new Identification(MY_HOST, null));
295
296         // tell it the leader is still active
297         state.process(new Identification(PREV_HOST, asgn));
298
299         Pair<Long, StateTimerTask> timer = onceTasks.remove();
300
301         assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
302         assertNotNull(timer.second());
303
304         // set up active state, as that's what it should return
305         State next = mock(State.class);
306         when(mgr.goActive()).thenReturn(next);
307
308         assertEquals(next, timer.second().fire());
309
310         // should NOT have published a Leader message
311         assertTrue(admin.isEmpty());
312
313         // should have gone active with the current assignments
314         verify(mgr).goActive();
315     }
316
317     @Test
318     public void testAwaitIdentification_NoAssignment() {
319         // not the leader and no assignment
320         BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
321         mgr.startDistributing(asgn);
322         state = new QueryState(mgr);
323
324         state.start();
325         state.process(new Identification(MY_HOST, null));
326
327         // tell it the leader is still active
328         state.process(new Identification(PREV_HOST, asgn));
329
330         Pair<Long, StateTimerTask> timer = onceTasks.remove();
331
332         assertEquals(STD_IDENTIFICATION_MS, timer.first().longValue());
333         assertNotNull(timer.second());
334
335         // set up inactive state, as that's what it should return
336         State next = mock(State.class);
337         when(mgr.goInactive()).thenReturn(next);
338
339         assertEquals(next, timer.second().fire());
340
341         // should NOT have published a Leader message
342         assertTrue(admin.isEmpty());
343     }
344
345     @Test
346     public void testHasAssignment() {
347         // null assignment
348         mgr.startDistributing(null);
349         assertFalse(state.hasAssignment());
350
351         // not in assignments
352         state.setAssignments(new BucketAssignments(new String[] {HOST3}));
353         assertFalse(state.hasAssignment());
354
355         // it IS in the assignments
356         state.setAssignments(new BucketAssignments(new String[] {MY_HOST}));
357         assertTrue(state.hasAssignment());
358     }
359
360     @Test
361     public void testRecordInfo_NullSource() {
362         state.setAssignments(ASGN3);
363         state.setLeader(MY_HOST);
364
365         BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
366         state.process(new Identification(null, asgn));
367
368         // leader unchanged
369         assertEquals(MY_HOST, state.getLeader());
370
371         // assignments still updated
372         assertEquals(asgn, state.getAssignments());
373     }
374
375     @Test
376     public void testRecordInfo_SourcePreceedsMyHost() {
377         state.setAssignments(ASGN3);
378         state.setLeader(MY_HOST);
379
380         BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
381         state.process(new Identification(PREV_HOST, asgn));
382
383         // new leader
384         assertEquals(PREV_HOST, state.getLeader());
385
386         // assignments still updated
387         assertEquals(asgn, state.getAssignments());
388     }
389
390     @Test
391     public void testRecordInfo_SourceFollowsMyHost() {
392         mgr.startDistributing(null);
393         state.setLeader(MY_HOST);
394
395         BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
396         state.process(new Identification(HOST1, asgn));
397
398         // leader unchanged
399         assertEquals(MY_HOST, state.getLeader());
400
401         // assignments still updated
402         assertEquals(asgn, state.getAssignments());
403     }
404
405     @Test
406     public void testRecordInfo_NewIsNull() {
407         state.setAssignments(ASGN3);
408         state.process(new Identification(HOST1, null));
409
410         assertEquals(ASGN3, state.getAssignments());
411     }
412
413     @Test
414     public void testRecordInfo_NewIsEmpty() {
415         state.setAssignments(ASGN3);
416         state.process(new Identification(PREV_HOST, new BucketAssignments()));
417
418         assertEquals(ASGN3, state.getAssignments());
419     }
420
421     @Test
422     public void testRecordInfo_OldIsNull() {
423         mgr.startDistributing(null);
424
425         BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
426         state.process(new Identification(HOST1, asgn));
427
428         assertEquals(asgn, state.getAssignments());
429     }
430
431     @Test
432     public void testRecordInfo_OldIsEmpty() {
433         state.setAssignments(new BucketAssignments());
434
435         BucketAssignments asgn = new BucketAssignments(new String[] {HOST1, HOST2});
436         state.process(new Identification(HOST1, asgn));
437
438         assertEquals(asgn, state.getAssignments());
439     }
440
441     @Test
442     public void testRecordInfo_NewLeaderPreceedsOld() {
443         state.setAssignments(ASGN3);
444         state.setLeader(MY_HOST);
445
446         BucketAssignments asgn = new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST2});
447         state.process(new Identification(HOST3, asgn));
448
449         assertEquals(asgn, state.getAssignments());
450     }
451
452     @Test
453     public void testRecordInfo_NewLeaderSucceedsOld() {
454         state.setAssignments(ASGN3);
455         state.setLeader(MY_HOST);
456
457         BucketAssignments asgn = new BucketAssignments(new String[] {HOST2, HOST3});
458         state.process(new Identification(HOST3, asgn));
459
460         // should be unchanged
461         assertEquals(ASGN3, state.getAssignments());
462     }
463
464 }