2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling.state;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.junit.jupiter.api.Assertions.assertEquals;
26 import static org.junit.jupiter.api.Assertions.assertFalse;
27 import static org.junit.jupiter.api.Assertions.assertNull;
28 import static org.junit.jupiter.api.Assertions.assertTrue;
29 import static org.mockito.ArgumentMatchers.any;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.never;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
35 import org.junit.jupiter.api.BeforeEach;
36 import org.junit.jupiter.api.Test;
37 import org.onap.policy.drools.pooling.CancellableScheduledTask;
38 import org.onap.policy.drools.pooling.PoolingManager;
39 import org.onap.policy.drools.pooling.message.BucketAssignments;
40 import org.onap.policy.drools.pooling.message.Heartbeat;
41 import org.onap.policy.drools.pooling.message.Identification;
42 import org.onap.policy.drools.pooling.message.Leader;
43 import org.onap.policy.drools.pooling.message.Offline;
44 import org.onap.policy.drools.pooling.message.Query;
46 class StateTest extends SupportBasicStateTester {
54 public void setUp() throws Exception {
57 state = new MyState(mgr);
61 void testStatePoolingManager() {
63 * Prove the state is attached to the manager by invoking getHost(), which
64 * delegates to the manager.
66 assertEquals(MY_HOST, state.getHost());
70 void testStateState() {
71 // allocate a new state, copying from the old state
72 state = new MyState(mgr);
75 * Prove the state is attached to the manager by invoking getHost(), which
76 * delegates to the manager.
78 assertEquals(MY_HOST, state.getHost());
82 void testCancelTimers() {
87 * Create three tasks tasks.
90 StateTimerTask task1 = mock(StateTimerTask.class);
91 StateTimerTask task2 = mock(StateTimerTask.class);
92 StateTimerTask task3 = mock(StateTimerTask.class);
94 // two tasks via schedule()
95 state.schedule(delay, task1);
96 state.schedule(delay, task2);
98 // one task via scheduleWithFixedDelay()
99 state.scheduleWithFixedDelay(initDelay, delay, task3);
101 // ensure all were scheduled, but not yet canceled
102 verify(mgr).schedule(delay, task1);
103 verify(mgr).schedule(delay, task2);
104 verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3);
106 CancellableScheduledTask sched1 = onceSchedules.removeFirst();
107 CancellableScheduledTask sched2 = onceSchedules.removeFirst();
108 CancellableScheduledTask sched3 = repeatedSchedules.removeFirst();
110 verify(sched1, never()).cancel();
111 verify(sched2, never()).cancel();
112 verify(sched3, never()).cancel();
117 state.cancelTimers();
119 // verify that all were cancelled
120 verify(sched1).cancel();
121 verify(sched2).cancel();
122 verify(sched3).cancel();
127 assertThatCode(() -> state.start()).doesNotThrowAnyException();
132 State next = mock(State.class);
133 when(mgr.goStart()).thenReturn(next);
135 State next2 = state.goStart();
136 assertEquals(next, next2);
141 State next = mock(State.class);
142 when(mgr.goQuery()).thenReturn(next);
144 State next2 = state.goQuery();
145 assertEquals(next, next2);
149 void testGoActive_WithAssignment() {
150 State act = mock(State.class);
151 State inact = mock(State.class);
153 when(mgr.goActive()).thenReturn(act);
154 when(mgr.goInactive()).thenReturn(inact);
156 String[] arr = {HOST2, PREV_HOST, MY_HOST};
157 BucketAssignments asgn = new BucketAssignments(arr);
159 assertEquals(act, state.goActive(asgn));
161 verify(mgr).startDistributing(asgn);
165 void testGoActive_WithoutAssignment() {
166 State act = mock(State.class);
167 State inact = mock(State.class);
169 when(mgr.goActive()).thenReturn(act);
170 when(mgr.goInactive()).thenReturn(inact);
172 String[] arr = {HOST2, PREV_HOST};
173 BucketAssignments asgn = new BucketAssignments(arr);
175 assertEquals(inact, state.goActive(asgn));
177 verify(mgr).startDistributing(asgn);
181 void testGoActive_NullAssignment() {
182 State act = mock(State.class);
183 State inact = mock(State.class);
185 when(mgr.goActive()).thenReturn(act);
186 when(mgr.goInactive()).thenReturn(inact);
188 assertEquals(inact, state.goActive(null));
190 verify(mgr, never()).startDistributing(any());
194 void testGoInactive() {
195 State next = mock(State.class);
196 when(mgr.goInactive()).thenReturn(next);
198 State next2 = state.goInactive();
199 assertEquals(next, next2);
203 void testProcessHeartbeat() {
204 assertNull(state.process(new Heartbeat()));
208 void testProcessIdentification() {
209 assertNull(state.process(new Identification()));
213 void testProcessLeader() {
214 String[] arr = {HOST2, HOST1};
215 BucketAssignments asgn = new BucketAssignments(arr);
216 Leader msg = new Leader(HOST1, asgn);
219 assertNull(state.process(msg));
220 verify(mgr).startDistributing(asgn);
224 void testProcessLeader_Invalid() {
225 Leader msg = new Leader(PREV_HOST, null);
227 // should stay in the same state, and not start distributing
228 assertNull(state.process(msg));
229 verify(mgr, never()).startDistributing(any());
230 verify(mgr, never()).goActive();
231 verify(mgr, never()).goInactive();
235 void testIsValidLeader_NullAssignment() {
236 assertFalse(state.isValid(new Leader(PREV_HOST, null)));
240 void testIsValidLeader_NullSource() {
241 String[] arr = {HOST2, PREV_HOST, MY_HOST};
242 BucketAssignments asgn = new BucketAssignments(arr);
243 assertFalse(state.isValid(new Leader(null, asgn)));
247 void testIsValidLeader_EmptyAssignment() {
248 assertFalse(state.isValid(new Leader(PREV_HOST, new BucketAssignments())));
252 void testIsValidLeader_FromSelf() {
253 String[] arr = {HOST2, MY_HOST};
254 BucketAssignments asgn = new BucketAssignments(arr);
256 assertFalse(state.isValid(new Leader(MY_HOST, asgn)));
260 void testIsValidLeader_WrongLeader() {
261 String[] arr = {HOST2, HOST3};
262 BucketAssignments asgn = new BucketAssignments(arr);
264 assertFalse(state.isValid(new Leader(HOST1, asgn)));
268 void testIsValidLeader() {
269 String[] arr = {HOST2, HOST1};
270 BucketAssignments asgn = new BucketAssignments(arr);
272 assertTrue(state.isValid(new Leader(HOST1, asgn)));
276 void testProcessOffline() {
277 assertNull(state.process(new Offline()));
281 void testProcessQuery() {
282 assertNull(state.process(new Query()));
286 void testPublishIdentification() {
287 Identification msg = new Identification();
290 verify(mgr).publishAdmin(msg);
294 void testPublishLeader() {
295 Leader msg = new Leader();
298 verify(mgr).publishAdmin(msg);
302 void testPublishOffline() {
303 Offline msg = new Offline();
306 verify(mgr).publishAdmin(msg);
310 void testPublishQuery() {
311 Query msg = new Query();
314 verify(mgr).publishAdmin(msg);
318 void testPublishStringHeartbeat() {
319 String chnl = "channelH";
320 Heartbeat msg = new Heartbeat();
322 state.publish(chnl, msg);
324 verify(mgr).publish(chnl, msg);
328 void testStartDistributing() {
329 BucketAssignments asgn = new BucketAssignments();
330 state.startDistributing(asgn);
332 verify(mgr).startDistributing(asgn);
336 void testStartDistributing_NullAssignments() {
337 state.startDistributing(null);
339 verify(mgr, never()).startDistributing(any());
343 void testSchedule() {
346 StateTimerTask task = mock(StateTimerTask.class);
348 state.schedule(delay, task);
350 CancellableScheduledTask sched = onceSchedules.removeFirst();
352 // scheduled, but not canceled yet
353 verify(mgr).schedule(delay, task);
354 verify(sched, never()).cancel();
357 * Ensure the state added the timer to its list by telling it to cancel its timers
358 * and then seeing if this timer was canceled.
360 state.cancelTimers();
361 verify(sched).cancel();
365 void testScheduleWithFixedDelay() {
369 StateTimerTask task = mock(StateTimerTask.class);
371 state.scheduleWithFixedDelay(initdel, delay, task);
373 CancellableScheduledTask sched = repeatedSchedules.removeFirst();
375 // scheduled, but not canceled yet
376 verify(mgr).scheduleWithFixedDelay(initdel, delay, task);
377 verify(sched, never()).cancel();
380 * Ensure the state added the timer to its list by telling it to cancel its timers
381 * and then seeing if this timer was canceled.
383 state.cancelTimers();
384 verify(sched).cancel();
388 void testMissedHeartbeat() {
389 State next = mock(State.class);
390 when(mgr.goStart()).thenReturn(next);
392 State next2 = state.missedHeartbeat();
393 assertEquals(next, next2);
395 // should continue to distribute
396 verify(mgr, never()).startDistributing(null);
398 Offline msg = captureAdminMessage(Offline.class);
399 assertEquals(MY_HOST, msg.getSource());
403 void testInternalTopicFailed() {
404 State next = mock(State.class);
405 when(mgr.goInactive()).thenReturn(next);
407 State next2 = state.internalTopicFailed();
408 assertEquals(next, next2);
410 // should stop distributing
411 verify(mgr).startDistributing(null);
413 Offline msg = captureAdminMessage(Offline.class);
414 assertEquals(MY_HOST, msg.getSource());
418 void testMakeHeartbeat() {
419 long timestamp = 30000L;
420 Heartbeat msg = state.makeHeartbeat(timestamp);
422 assertEquals(MY_HOST, msg.getSource());
423 assertEquals(timestamp, msg.getTimestampMs());
427 void testMakeIdentification() {
428 Identification ident = state.makeIdentification();
429 assertEquals(MY_HOST, ident.getSource());
430 assertEquals(ASGN3, ident.getAssignments());
434 void testMakeOffline() {
435 Offline msg = state.makeOffline();
437 assertEquals(MY_HOST, msg.getSource());
441 void testMakeQuery() {
442 Query msg = state.makeQuery();
444 assertEquals(MY_HOST, msg.getSource());
449 assertEquals(MY_HOST, state.getHost());
453 void testGetTopic() {
454 assertEquals(MY_TOPIC, state.getTopic());
458 * State used for testing purposes, with abstract methods implemented.
460 private static class MyState extends State {
462 public MyState(PoolingManager mgr) {