Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / state / ActiveStateTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2020 Nordix Foundation
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling.state;
23
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.ArgumentMatchers.anyLong;
30 import static org.mockito.ArgumentMatchers.anyString;
31 import static org.mockito.Mockito.atLeast;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.never;
34 import static org.mockito.Mockito.times;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.when;
37
38 import java.util.Arrays;
39 import org.apache.commons.lang3.tuple.Pair;
40 import org.apache.commons.lang3.tuple.Triple;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.onap.policy.drools.pooling.message.BucketAssignments;
44 import org.onap.policy.drools.pooling.message.Heartbeat;
45 import org.onap.policy.drools.pooling.message.Leader;
46 import org.onap.policy.drools.pooling.message.Offline;
47 import org.onap.policy.drools.pooling.message.Query;
48
49 public class ActiveStateTest extends SupportBasicStateTester {
50
51     private ActiveState state;
52
53     /**
54      * Setup.
55      */
56     @Override
57     @Before
58     public void setUp() throws Exception {
59         super.setUp();
60
61         state = new ActiveState(mgr);
62     }
63
64     @Test
65     public void testStart() {
66         state.start();
67
68         // ensure the timers were created
69         verify(mgr, atLeast(1)).scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class));
70
71         // ensure a heart beat was generated
72         Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
73         assertEquals(MY_HOST, msg.getRight().getSource());
74     }
75
76     @Test
77     public void testProcessHeartbeat_NullHost() {
78         assertNull(state.process(new Heartbeat()));
79
80         assertFalse(state.isMyHeartbeatSeen());
81         assertFalse(state.isPredHeartbeatSeen());
82
83         verify(mgr, never()).goInactive();
84         verify(mgr, never()).goQuery();
85     }
86
87     @Test
88     public void testProcessHeartbeat_MyHost() {
89         assertNull(state.process(new Heartbeat(MY_HOST, 0L)));
90
91         assertTrue(state.isMyHeartbeatSeen());
92         assertFalse(state.isPredHeartbeatSeen());
93
94         verify(mgr, never()).goInactive();
95         verify(mgr, never()).goQuery();
96     }
97
98     @Test
99     public void testProcessHeartbeat_Predecessor() {
100         assertNull(state.process(new Heartbeat(HOST2, 0L)));
101
102         assertFalse(state.isMyHeartbeatSeen());
103         assertTrue(state.isPredHeartbeatSeen());
104
105         verify(mgr, never()).goInactive();
106         verify(mgr, never()).goQuery();
107     }
108
109     @Test
110     public void testProcessHeartbeat_OtherHost() {
111         assertNull(state.process(new Heartbeat(HOST3, 0L)));
112
113         assertFalse(state.isMyHeartbeatSeen());
114         assertFalse(state.isPredHeartbeatSeen());
115
116         verify(mgr, never()).goInactive();
117         verify(mgr, never()).goQuery();
118     }
119
120     @Test
121     public void testProcessOffline_NullHost() {
122         // should be ignored
123         assertNull(state.process(new Offline()));
124     }
125
126     @Test
127     public void testProcessOffline_UnassignedHost() {
128         // HOST4 is not in the assignment list - should be ignored
129         assertNull(state.process(new Offline(HOST4)));
130     }
131
132     @Test
133     public void testProcessOffline_IAmLeader() {
134         // configure the next state
135         State next = mock(State.class);
136         when(mgr.goActive()).thenReturn(next);
137
138         // one of the assigned hosts went offline
139         assertEquals(next, state.process(new Offline(HOST1)));
140
141         // should have sent a new Leader message
142         Leader msg = captureAdminMessage(Leader.class);
143
144         assertEquals(MY_HOST, msg.getSource());
145
146         // check new bucket assignments
147         assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST2), Arrays.asList(msg.getAssignments().getHostArray()));
148     }
149
150     @Test
151     public void testProcessOffline_PredecessorIsLeaderNowOffline() {
152         // configure the next state
153         State next = mock(State.class);
154         when(mgr.goActive()).thenReturn(next);
155
156         // I am not the leader, but my predecessor was
157         mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1}));
158         state = new ActiveState(mgr);
159
160         // my predecessor went offline
161         assertEquals(next, state.process(new Offline(PREV_HOST)));
162
163         // should have sent a new Leader message
164         Leader msg = captureAdminMessage(Leader.class);
165
166         assertEquals(MY_HOST, msg.getSource());
167
168         // check new bucket assignments
169         assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST1), Arrays.asList(msg.getAssignments().getHostArray()));
170     }
171
172     @Test
173     public void testProcessOffline__PredecessorIsNotLeaderNowOffline() {
174         // I am not the leader, and neither is my predecessor
175         mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, PREV_HOST2}));
176         state = new ActiveState(mgr);
177
178         /*
179          *
180          * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader thus
181          * should be ignored.
182          */
183         assertNull(state.process(new Offline(PREV_HOST2)));
184     }
185
186     @Test
187     public void testProcessOffline_OtherAssignedHostOffline() {
188         // I am not the leader
189         mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1}));
190         state = new ActiveState(mgr);
191
192         /*
193          * HOST1 has buckets, but it isn't the leader and it isn't my predecessor, thus
194          * should be ignored.
195          */
196         assertNull(state.process(new Offline(HOST1)));
197     }
198
199     @Test
200     public void testProcessLeader_Invalid() {
201         Leader msg = new Leader(PREV_HOST, null);
202
203         // should stay in the same state, and not start distributing
204         assertNull(state.process(msg));
205         verify(mgr, never()).startDistributing(any());
206         verify(mgr, never()).goActive();
207         verify(mgr, never()).goInactive();
208
209         // info should be unchanged
210         assertEquals(MY_HOST, state.getLeader());
211         assertEquals(ASGN3, state.getAssignments());
212     }
213
214     @Test
215     public void testProcessLeader_BadLeader() {
216         String[] arr = {HOST2, HOST1};
217         BucketAssignments asgn = new BucketAssignments(arr);
218
219         // now send a Leader message for that leader
220         Leader msg = new Leader(HOST1, asgn);
221
222         State next = mock(State.class);
223         when(mgr.goQuery()).thenReturn(next);
224
225         // should go Query, but not start distributing
226         assertEquals(next, state.process(msg));
227         verify(mgr, never()).startDistributing(asgn);
228     }
229
230     @Test
231     public void testProcessLeader_GoodLeader() {
232         String[] arr = {HOST2, PREV_HOST, MY_HOST};
233         BucketAssignments asgn = new BucketAssignments(arr);
234
235         // now send a Leader message for that leader
236         Leader msg = new Leader(PREV_HOST, asgn);
237
238         State next = mock(State.class);
239         when(mgr.goActive()).thenReturn(next);
240
241         // should go Active and start distributing
242         assertEquals(next, state.process(msg));
243         verify(mgr).startDistributing(asgn);
244     }
245
246     @Test
247     public void testActiveState() {
248         assertEquals(MY_HOST, state.getLeader());
249         assertEquals(ASGN3, state.getAssignments());
250
251         // verify that it determined its neighbors
252         assertEquals(HOST1, state.getSuccHost());
253         assertEquals(HOST2, state.getPredHost());
254     }
255
256     @Test
257     public void testDetmNeighbors() {
258         // if only one host (i.e., itself)
259         mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST}));
260         state = new ActiveState(mgr);
261         assertEquals(null, state.getSuccHost());
262         assertEquals("", state.getPredHost());
263
264         // two hosts
265         mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, HOST2}));
266         state = new ActiveState(mgr);
267         assertEquals(HOST2, state.getSuccHost());
268         assertEquals(HOST2, state.getPredHost());
269
270         // three hosts
271         mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2}));
272         state = new ActiveState(mgr);
273         assertEquals(HOST2, state.getSuccHost());
274         assertEquals(HOST3, state.getPredHost());
275
276         // more hosts
277         mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2, HOST4}));
278         state = new ActiveState(mgr);
279         assertEquals(HOST2, state.getSuccHost());
280         assertEquals(HOST4, state.getPredHost());
281     }
282
283     @Test
284     public void testAddTimers_WithPredecessor() {
285         // invoke start() to add the timers
286         state.start();
287
288         assertEquals(3, repeatedSchedules.size());
289
290         Triple<Long, Long, StateTimerTask> timer;
291
292         // heart beat generator
293         timer = repeatedTasks.remove();
294         assertEquals(STD_INTER_HEARTBEAT_MS, timer.getLeft().longValue());
295         assertEquals(STD_INTER_HEARTBEAT_MS, timer.getMiddle().longValue());
296
297         // my heart beat checker
298         timer = repeatedTasks.remove();
299         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getLeft().longValue());
300         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getMiddle().longValue());
301
302         // predecessor's heart beat checker
303         timer = repeatedTasks.remove();
304         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getLeft().longValue());
305         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getMiddle().longValue());
306     }
307
308     @Test
309     public void testAddTimers_SansPredecessor() {
310         // only one host, thus no predecessor
311         mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST}));
312         state = new ActiveState(mgr);
313
314         // invoke start() to add the timers
315         state.start();
316
317         assertEquals(2, repeatedSchedules.size());
318
319         Triple<Long, Long, StateTimerTask> timer;
320
321         // heart beat generator
322         timer = repeatedTasks.remove();
323         assertEquals(STD_INTER_HEARTBEAT_MS, timer.getLeft().longValue());
324         assertEquals(STD_INTER_HEARTBEAT_MS, timer.getMiddle().longValue());
325
326         // my heart beat checker
327         timer = repeatedTasks.remove();
328         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getLeft().longValue());
329         assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getMiddle().longValue());
330     }
331
332     @Test
333     public void testAddTimers_HeartbeatGenerator() {
334         // only one host so we only have to look at one heart beat at a time
335         mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST}));
336         state = new ActiveState(mgr);
337
338         // invoke start() to add the timers
339         state.start();
340
341         Triple<Long, Long, StateTimerTask> task = repeatedTasks.remove();
342
343         verify(mgr).publish(anyString(), any(Heartbeat.class));
344
345         // fire the task
346         assertNull(task.getRight().fire());
347
348         // should have generated a second pair of heart beats
349         verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class));
350
351         Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
352         assertEquals(MY_HOST, msg.getLeft());
353         assertEquals(MY_HOST, msg.getRight().getSource());
354     }
355
356     @Test
357     public void testAddTimers_MyHeartbeatSeen() {
358         // invoke start() to add the timers
359         state.start();
360
361         Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1);
362
363         // indicate that this host is still alive
364         state.process(new Heartbeat(MY_HOST, 0L));
365
366         // set up next state
367         State next = mock(State.class);
368         when(mgr.goInactive()).thenReturn(next);
369
370         // fire the task - should not transition
371         assertNull(task.getRight().fire());
372
373         verify(mgr, never()).publishAdmin(any(Query.class));
374     }
375
376     @Test
377     public void testAddTimers_MyHeartbeatMissed() {
378         // invoke start() to add the timers
379         state.start();
380
381         Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1);
382
383         // set up next state
384         State next = mock(State.class);
385         when(mgr.goStart()).thenReturn(next);
386
387         // fire the task - should transition
388         assertEquals(next, task.getRight().fire());
389
390         // should continue to distribute
391         verify(mgr, never()).startDistributing(null);
392
393         // should publish an offline message
394         Offline msg = captureAdminMessage(Offline.class);
395         assertEquals(MY_HOST, msg.getSource());
396     }
397
398     @Test
399     public void testAddTimers_PredecessorHeartbeatSeen() {
400         // invoke start() to add the timers
401         state.start();
402
403         Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2);
404
405         // indicate that the predecessor is still alive
406         state.process(new Heartbeat(HOST2, 0L));
407
408         // set up next state, just in case
409         State next = mock(State.class);
410         when(mgr.goQuery()).thenReturn(next);
411
412         // fire the task - should NOT transition
413         assertNull(task.getRight().fire());
414
415         verify(mgr, never()).publishAdmin(any(Query.class));
416     }
417
418     @Test
419     public void testAddTimers_PredecessorHeartbeatMissed() {
420         // invoke start() to add the timers
421         state.start();
422
423         Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2);
424
425         // set up next state
426         State next = mock(State.class);
427         when(mgr.goQuery()).thenReturn(next);
428
429         // fire the task - should transition
430         assertEquals(next, task.getRight().fire());
431
432         verify(mgr).publishAdmin(any(Query.class));
433     }
434
435     @Test
436     public void testGenHeartbeat_OneHost() {
437         // only one host (i.e., itself)
438         mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST}));
439         state = new ActiveState(mgr);
440
441         state.start();
442
443         verify(mgr, times(1)).publish(any(), any());
444
445         Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
446         assertEquals(MY_HOST, msg.getLeft());
447         assertEquals(MY_HOST, msg.getRight().getSource());
448     }
449
450     @Test
451     public void testGenHeartbeat_MultipleHosts() {
452         state.start();
453
454         verify(mgr, times(2)).publish(any(), any());
455
456         Pair<String, Heartbeat> msg;
457         int index = 0;
458
459         // this message should go to itself
460         msg = capturePublishedMessage(Heartbeat.class, index++);
461         assertEquals(MY_HOST, msg.getLeft());
462         assertEquals(MY_HOST, msg.getRight().getSource());
463
464         // this message should go to its successor
465         msg = capturePublishedMessage(Heartbeat.class, index++);
466         assertEquals(HOST1, msg.getLeft());
467         assertEquals(MY_HOST, msg.getRight().getSource());
468     }
469
470 }