2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2020 Nordix Foundation
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling.state;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.ArgumentMatchers.anyLong;
30 import static org.mockito.ArgumentMatchers.anyString;
31 import static org.mockito.Mockito.atLeast;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.never;
34 import static org.mockito.Mockito.times;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.when;
38 import java.util.Arrays;
39 import org.apache.commons.lang3.tuple.Pair;
40 import org.apache.commons.lang3.tuple.Triple;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.onap.policy.drools.pooling.message.BucketAssignments;
44 import org.onap.policy.drools.pooling.message.Heartbeat;
45 import org.onap.policy.drools.pooling.message.Leader;
46 import org.onap.policy.drools.pooling.message.Offline;
47 import org.onap.policy.drools.pooling.message.Query;
49 public class ActiveStateTest extends SupportBasicStateTester {
51 private ActiveState state;
58 public void setUp() throws Exception {
61 state = new ActiveState(mgr);
65 public void testStart() {
68 // ensure the timers were created
69 verify(mgr, atLeast(1)).scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class));
71 // ensure a heart beat was generated
72 Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
73 assertEquals(MY_HOST, msg.getRight().getSource());
77 public void testProcessHeartbeat_NullHost() {
78 assertNull(state.process(new Heartbeat()));
80 assertFalse(state.isMyHeartbeatSeen());
81 assertFalse(state.isPredHeartbeatSeen());
83 verify(mgr, never()).goInactive();
84 verify(mgr, never()).goQuery();
88 public void testProcessHeartbeat_MyHost() {
89 assertNull(state.process(new Heartbeat(MY_HOST, 0L)));
91 assertTrue(state.isMyHeartbeatSeen());
92 assertFalse(state.isPredHeartbeatSeen());
94 verify(mgr, never()).goInactive();
95 verify(mgr, never()).goQuery();
99 public void testProcessHeartbeat_Predecessor() {
100 assertNull(state.process(new Heartbeat(HOST2, 0L)));
102 assertFalse(state.isMyHeartbeatSeen());
103 assertTrue(state.isPredHeartbeatSeen());
105 verify(mgr, never()).goInactive();
106 verify(mgr, never()).goQuery();
110 public void testProcessHeartbeat_OtherHost() {
111 assertNull(state.process(new Heartbeat(HOST3, 0L)));
113 assertFalse(state.isMyHeartbeatSeen());
114 assertFalse(state.isPredHeartbeatSeen());
116 verify(mgr, never()).goInactive();
117 verify(mgr, never()).goQuery();
121 public void testProcessOffline_NullHost() {
123 assertNull(state.process(new Offline()));
127 public void testProcessOffline_UnassignedHost() {
128 // HOST4 is not in the assignment list - should be ignored
129 assertNull(state.process(new Offline(HOST4)));
133 public void testProcessOffline_IAmLeader() {
134 // configure the next state
135 State next = mock(State.class);
136 when(mgr.goActive()).thenReturn(next);
138 // one of the assigned hosts went offline
139 assertEquals(next, state.process(new Offline(HOST1)));
141 // should have sent a new Leader message
142 Leader msg = captureAdminMessage(Leader.class);
144 assertEquals(MY_HOST, msg.getSource());
146 // check new bucket assignments
147 assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST2), Arrays.asList(msg.getAssignments().getHostArray()));
151 public void testProcessOffline_PredecessorIsLeaderNowOffline() {
152 // configure the next state
153 State next = mock(State.class);
154 when(mgr.goActive()).thenReturn(next);
156 // I am not the leader, but my predecessor was
157 mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1}));
158 state = new ActiveState(mgr);
160 // my predecessor went offline
161 assertEquals(next, state.process(new Offline(PREV_HOST)));
163 // should have sent a new Leader message
164 Leader msg = captureAdminMessage(Leader.class);
166 assertEquals(MY_HOST, msg.getSource());
168 // check new bucket assignments
169 assertEquals(Arrays.asList(MY_HOST, MY_HOST, HOST1), Arrays.asList(msg.getAssignments().getHostArray()));
173 public void testProcessOffline__PredecessorIsNotLeaderNowOffline() {
174 // I am not the leader, and neither is my predecessor
175 mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, PREV_HOST2}));
176 state = new ActiveState(mgr);
180 * PREV_HOST2 has buckets and is my predecessor, but it isn't the leader thus
183 assertNull(state.process(new Offline(PREV_HOST2)));
187 public void testProcessOffline_OtherAssignedHostOffline() {
188 // I am not the leader
189 mgr.startDistributing(new BucketAssignments(new String[] {PREV_HOST, MY_HOST, HOST1}));
190 state = new ActiveState(mgr);
193 * HOST1 has buckets, but it isn't the leader and it isn't my predecessor, thus
196 assertNull(state.process(new Offline(HOST1)));
200 public void testProcessLeader_Invalid() {
201 Leader msg = new Leader(PREV_HOST, null);
203 // should stay in the same state, and not start distributing
204 assertNull(state.process(msg));
205 verify(mgr, never()).startDistributing(any());
206 verify(mgr, never()).goActive();
207 verify(mgr, never()).goInactive();
209 // info should be unchanged
210 assertEquals(MY_HOST, state.getLeader());
211 assertEquals(ASGN3, state.getAssignments());
215 public void testProcessLeader_BadLeader() {
216 String[] arr = {HOST2, HOST1};
217 BucketAssignments asgn = new BucketAssignments(arr);
219 // now send a Leader message for that leader
220 Leader msg = new Leader(HOST1, asgn);
222 State next = mock(State.class);
223 when(mgr.goQuery()).thenReturn(next);
225 // should go Query, but not start distributing
226 assertEquals(next, state.process(msg));
227 verify(mgr, never()).startDistributing(asgn);
231 public void testProcessLeader_GoodLeader() {
232 String[] arr = {HOST2, PREV_HOST, MY_HOST};
233 BucketAssignments asgn = new BucketAssignments(arr);
235 // now send a Leader message for that leader
236 Leader msg = new Leader(PREV_HOST, asgn);
238 State next = mock(State.class);
239 when(mgr.goActive()).thenReturn(next);
241 // should go Active and start distributing
242 assertEquals(next, state.process(msg));
243 verify(mgr).startDistributing(asgn);
247 public void testActiveState() {
248 assertEquals(MY_HOST, state.getLeader());
249 assertEquals(ASGN3, state.getAssignments());
251 // verify that it determined its neighbors
252 assertEquals(HOST1, state.getSuccHost());
253 assertEquals(HOST2, state.getPredHost());
257 public void testDetmNeighbors() {
258 // if only one host (i.e., itself)
259 mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST}));
260 state = new ActiveState(mgr);
261 assertEquals(null, state.getSuccHost());
262 assertEquals("", state.getPredHost());
265 mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, HOST2}));
266 state = new ActiveState(mgr);
267 assertEquals(HOST2, state.getSuccHost());
268 assertEquals(HOST2, state.getPredHost());
271 mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2}));
272 state = new ActiveState(mgr);
273 assertEquals(HOST2, state.getSuccHost());
274 assertEquals(HOST3, state.getPredHost());
277 mgr.startDistributing(new BucketAssignments(new String[] {HOST3, MY_HOST, HOST2, HOST4}));
278 state = new ActiveState(mgr);
279 assertEquals(HOST2, state.getSuccHost());
280 assertEquals(HOST4, state.getPredHost());
284 public void testAddTimers_WithPredecessor() {
285 // invoke start() to add the timers
288 assertEquals(3, repeatedSchedules.size());
290 Triple<Long, Long, StateTimerTask> timer;
292 // heart beat generator
293 timer = repeatedTasks.remove();
294 assertEquals(STD_INTER_HEARTBEAT_MS, timer.getLeft().longValue());
295 assertEquals(STD_INTER_HEARTBEAT_MS, timer.getMiddle().longValue());
297 // my heart beat checker
298 timer = repeatedTasks.remove();
299 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getLeft().longValue());
300 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getMiddle().longValue());
302 // predecessor's heart beat checker
303 timer = repeatedTasks.remove();
304 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getLeft().longValue());
305 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getMiddle().longValue());
309 public void testAddTimers_SansPredecessor() {
310 // only one host, thus no predecessor
311 mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST, MY_HOST}));
312 state = new ActiveState(mgr);
314 // invoke start() to add the timers
317 assertEquals(2, repeatedSchedules.size());
319 Triple<Long, Long, StateTimerTask> timer;
321 // heart beat generator
322 timer = repeatedTasks.remove();
323 assertEquals(STD_INTER_HEARTBEAT_MS, timer.getLeft().longValue());
324 assertEquals(STD_INTER_HEARTBEAT_MS, timer.getMiddle().longValue());
326 // my heart beat checker
327 timer = repeatedTasks.remove();
328 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getLeft().longValue());
329 assertEquals(STD_ACTIVE_HEARTBEAT_MS, timer.getMiddle().longValue());
333 public void testAddTimers_HeartbeatGenerator() {
334 // only one host so we only have to look at one heart beat at a time
335 mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST}));
336 state = new ActiveState(mgr);
338 // invoke start() to add the timers
341 Triple<Long, Long, StateTimerTask> task = repeatedTasks.remove();
343 verify(mgr).publish(anyString(), any(Heartbeat.class));
346 assertNull(task.getRight().fire());
348 // should have generated a second pair of heart beats
349 verify(mgr, times(2)).publish(anyString(), any(Heartbeat.class));
351 Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
352 assertEquals(MY_HOST, msg.getLeft());
353 assertEquals(MY_HOST, msg.getRight().getSource());
357 public void testAddTimers_MyHeartbeatSeen() {
358 // invoke start() to add the timers
361 Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1);
363 // indicate that this host is still alive
364 state.process(new Heartbeat(MY_HOST, 0L));
367 State next = mock(State.class);
368 when(mgr.goInactive()).thenReturn(next);
370 // fire the task - should not transition
371 assertNull(task.getRight().fire());
373 verify(mgr, never()).publishAdmin(any(Query.class));
377 public void testAddTimers_MyHeartbeatMissed() {
378 // invoke start() to add the timers
381 Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(1);
384 State next = mock(State.class);
385 when(mgr.goStart()).thenReturn(next);
387 // fire the task - should transition
388 assertEquals(next, task.getRight().fire());
390 // should continue to distribute
391 verify(mgr, never()).startDistributing(null);
393 // should publish an offline message
394 Offline msg = captureAdminMessage(Offline.class);
395 assertEquals(MY_HOST, msg.getSource());
399 public void testAddTimers_PredecessorHeartbeatSeen() {
400 // invoke start() to add the timers
403 Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2);
405 // indicate that the predecessor is still alive
406 state.process(new Heartbeat(HOST2, 0L));
408 // set up next state, just in case
409 State next = mock(State.class);
410 when(mgr.goQuery()).thenReturn(next);
412 // fire the task - should NOT transition
413 assertNull(task.getRight().fire());
415 verify(mgr, never()).publishAdmin(any(Query.class));
419 public void testAddTimers_PredecessorHeartbeatMissed() {
420 // invoke start() to add the timers
423 Triple<Long, Long, StateTimerTask> task = repeatedTasks.get(2);
426 State next = mock(State.class);
427 when(mgr.goQuery()).thenReturn(next);
429 // fire the task - should transition
430 assertEquals(next, task.getRight().fire());
432 verify(mgr).publishAdmin(any(Query.class));
436 public void testGenHeartbeat_OneHost() {
437 // only one host (i.e., itself)
438 mgr.startDistributing(new BucketAssignments(new String[] {MY_HOST}));
439 state = new ActiveState(mgr);
443 verify(mgr, times(1)).publish(any(), any());
445 Pair<String, Heartbeat> msg = capturePublishedMessage(Heartbeat.class);
446 assertEquals(MY_HOST, msg.getLeft());
447 assertEquals(MY_HOST, msg.getRight().getSource());
451 public void testGenHeartbeat_MultipleHosts() {
454 verify(mgr, times(2)).publish(any(), any());
456 Pair<String, Heartbeat> msg;
459 // this message should go to itself
460 msg = capturePublishedMessage(Heartbeat.class, index++);
461 assertEquals(MY_HOST, msg.getLeft());
462 assertEquals(MY_HOST, msg.getRight().getSource());
464 // this message should go to its successor
465 msg = capturePublishedMessage(Heartbeat.class, index++);
466 assertEquals(HOST1, msg.getLeft());
467 assertEquals(MY_HOST, msg.getRight().getSource());