2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling.state;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.onap.policy.drools.pooling.CancellableScheduledTask;
37 import org.onap.policy.drools.pooling.PoolingManager;
38 import org.onap.policy.drools.pooling.message.BucketAssignments;
39 import org.onap.policy.drools.pooling.message.Heartbeat;
40 import org.onap.policy.drools.pooling.message.Identification;
41 import org.onap.policy.drools.pooling.message.Leader;
42 import org.onap.policy.drools.pooling.message.Offline;
43 import org.onap.policy.drools.pooling.message.Query;
45 public class StateTest extends SupportBasicStateTester {
53 public void setUp() throws Exception {
56 state = new MyState(mgr);
60 public void testStatePoolingManager() {
62 * Prove the state is attached to the manager by invoking getHost(), which
63 * delegates to the manager.
65 assertEquals(MY_HOST, state.getHost());
69 public void testStateState() {
70 // allocate a new state, copying from the old state
71 state = new MyState(mgr);
74 * Prove the state is attached to the manager by invoking getHost(), which
75 * delegates to the manager.
77 assertEquals(MY_HOST, state.getHost());
81 public void testCancelTimers() {
86 * Create three tasks tasks.
89 StateTimerTask task1 = mock(StateTimerTask.class);
90 StateTimerTask task2 = mock(StateTimerTask.class);
91 StateTimerTask task3 = mock(StateTimerTask.class);
93 // two tasks via schedule()
94 state.schedule(delay, task1);
95 state.schedule(delay, task2);
97 // one task via scheduleWithFixedDelay()
98 state.scheduleWithFixedDelay(initDelay, delay, task3);
100 // ensure all were scheduled, but not yet canceled
101 verify(mgr).schedule(delay, task1);
102 verify(mgr).schedule(delay, task2);
103 verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3);
105 CancellableScheduledTask sched1 = onceSchedules.removeFirst();
106 CancellableScheduledTask sched2 = onceSchedules.removeFirst();
107 CancellableScheduledTask sched3 = repeatedSchedules.removeFirst();
109 verify(sched1, never()).cancel();
110 verify(sched2, never()).cancel();
111 verify(sched3, never()).cancel();
116 state.cancelTimers();
118 // verify that all were cancelled
119 verify(sched1).cancel();
120 verify(sched2).cancel();
121 verify(sched3).cancel();
125 public void testStart() {
126 assertThatCode(() -> state.start()).doesNotThrowAnyException();
130 public void testGoStart() {
131 State next = mock(State.class);
132 when(mgr.goStart()).thenReturn(next);
134 State next2 = state.goStart();
135 assertEquals(next, next2);
139 public void testGoQuery() {
140 State next = mock(State.class);
141 when(mgr.goQuery()).thenReturn(next);
143 State next2 = state.goQuery();
144 assertEquals(next, next2);
148 public void testGoActive_WithAssignment() {
149 State act = mock(State.class);
150 State inact = mock(State.class);
152 when(mgr.goActive()).thenReturn(act);
153 when(mgr.goInactive()).thenReturn(inact);
155 String[] arr = {HOST2, PREV_HOST, MY_HOST};
156 BucketAssignments asgn = new BucketAssignments(arr);
158 assertEquals(act, state.goActive(asgn));
160 verify(mgr).startDistributing(asgn);
164 public void testGoActive_WithoutAssignment() {
165 State act = mock(State.class);
166 State inact = mock(State.class);
168 when(mgr.goActive()).thenReturn(act);
169 when(mgr.goInactive()).thenReturn(inact);
171 String[] arr = {HOST2, PREV_HOST};
172 BucketAssignments asgn = new BucketAssignments(arr);
174 assertEquals(inact, state.goActive(asgn));
176 verify(mgr).startDistributing(asgn);
180 public void testGoActive_NullAssignment() {
181 State act = mock(State.class);
182 State inact = mock(State.class);
184 when(mgr.goActive()).thenReturn(act);
185 when(mgr.goInactive()).thenReturn(inact);
187 assertEquals(inact, state.goActive(null));
189 verify(mgr, never()).startDistributing(any());
193 public void testGoInactive() {
194 State next = mock(State.class);
195 when(mgr.goInactive()).thenReturn(next);
197 State next2 = state.goInactive();
198 assertEquals(next, next2);
202 public void testProcessHeartbeat() {
203 assertNull(state.process(new Heartbeat()));
207 public void testProcessIdentification() {
208 assertNull(state.process(new Identification()));
212 public void testProcessLeader() {
213 String[] arr = {HOST2, HOST1};
214 BucketAssignments asgn = new BucketAssignments(arr);
215 Leader msg = new Leader(HOST1, asgn);
218 assertEquals(null, state.process(msg));
219 verify(mgr).startDistributing(asgn);
223 public void testProcessLeader_Invalid() {
224 Leader msg = new Leader(PREV_HOST, null);
226 // should stay in the same state, and not start distributing
227 assertNull(state.process(msg));
228 verify(mgr, never()).startDistributing(any());
229 verify(mgr, never()).goActive();
230 verify(mgr, never()).goInactive();
234 public void testIsValidLeader_NullAssignment() {
235 assertFalse(state.isValid(new Leader(PREV_HOST, null)));
239 public void testIsValidLeader_NullSource() {
240 String[] arr = {HOST2, PREV_HOST, MY_HOST};
241 BucketAssignments asgn = new BucketAssignments(arr);
242 assertFalse(state.isValid(new Leader(null, asgn)));
246 public void testIsValidLeader_EmptyAssignment() {
247 assertFalse(state.isValid(new Leader(PREV_HOST, new BucketAssignments())));
251 public void testIsValidLeader_FromSelf() {
252 String[] arr = {HOST2, MY_HOST};
253 BucketAssignments asgn = new BucketAssignments(arr);
255 assertFalse(state.isValid(new Leader(MY_HOST, asgn)));
259 public void testIsValidLeader_WrongLeader() {
260 String[] arr = {HOST2, HOST3};
261 BucketAssignments asgn = new BucketAssignments(arr);
263 assertFalse(state.isValid(new Leader(HOST1, asgn)));
267 public void testIsValidLeader() {
268 String[] arr = {HOST2, HOST1};
269 BucketAssignments asgn = new BucketAssignments(arr);
271 assertTrue(state.isValid(new Leader(HOST1, asgn)));
275 public void testProcessOffline() {
276 assertNull(state.process(new Offline()));
280 public void testProcessQuery() {
281 assertNull(state.process(new Query()));
285 public void testPublishIdentification() {
286 Identification msg = new Identification();
289 verify(mgr).publishAdmin(msg);
293 public void testPublishLeader() {
294 Leader msg = new Leader();
297 verify(mgr).publishAdmin(msg);
301 public void testPublishOffline() {
302 Offline msg = new Offline();
305 verify(mgr).publishAdmin(msg);
309 public void testPublishQuery() {
310 Query msg = new Query();
313 verify(mgr).publishAdmin(msg);
317 public void testPublishStringHeartbeat() {
318 String chnl = "channelH";
319 Heartbeat msg = new Heartbeat();
321 state.publish(chnl, msg);
323 verify(mgr).publish(chnl, msg);
327 public void testStartDistributing() {
328 BucketAssignments asgn = new BucketAssignments();
329 state.startDistributing(asgn);
331 verify(mgr).startDistributing(asgn);
335 public void testStartDistributing_NullAssignments() {
336 state.startDistributing(null);
338 verify(mgr, never()).startDistributing(any());
342 public void testSchedule() {
345 StateTimerTask task = mock(StateTimerTask.class);
347 state.schedule(delay, task);
349 CancellableScheduledTask sched = onceSchedules.removeFirst();
351 // scheduled, but not canceled yet
352 verify(mgr).schedule(delay, task);
353 verify(sched, never()).cancel();
356 * Ensure the state added the timer to its list by telling it to cancel its timers
357 * and then seeing if this timer was canceled.
359 state.cancelTimers();
360 verify(sched).cancel();
364 public void testScheduleWithFixedDelay() {
368 StateTimerTask task = mock(StateTimerTask.class);
370 state.scheduleWithFixedDelay(initdel, delay, task);
372 CancellableScheduledTask sched = repeatedSchedules.removeFirst();
374 // scheduled, but not canceled yet
375 verify(mgr).scheduleWithFixedDelay(initdel, delay, task);
376 verify(sched, never()).cancel();
379 * Ensure the state added the timer to its list by telling it to cancel its timers
380 * and then seeing if this timer was canceled.
382 state.cancelTimers();
383 verify(sched).cancel();
387 public void testMissedHeartbeat() {
388 State next = mock(State.class);
389 when(mgr.goStart()).thenReturn(next);
391 State next2 = state.missedHeartbeat();
392 assertEquals(next, next2);
394 // should continue to distribute
395 verify(mgr, never()).startDistributing(null);
397 Offline msg = captureAdminMessage(Offline.class);
398 assertEquals(MY_HOST, msg.getSource());
402 public void testInternalTopicFailed() {
403 State next = mock(State.class);
404 when(mgr.goInactive()).thenReturn(next);
406 State next2 = state.internalTopicFailed();
407 assertEquals(next, next2);
409 // should stop distributing
410 verify(mgr).startDistributing(null);
412 Offline msg = captureAdminMessage(Offline.class);
413 assertEquals(MY_HOST, msg.getSource());
417 public void testMakeHeartbeat() {
418 long timestamp = 30000L;
419 Heartbeat msg = state.makeHeartbeat(timestamp);
421 assertEquals(MY_HOST, msg.getSource());
422 assertEquals(timestamp, msg.getTimestampMs());
426 public void testMakeIdentification() {
427 Identification ident = state.makeIdentification();
428 assertEquals(MY_HOST, ident.getSource());
429 assertEquals(ASGN3, ident.getAssignments());
433 public void testMakeOffline() {
434 Offline msg = state.makeOffline();
436 assertEquals(MY_HOST, msg.getSource());
440 public void testMakeQuery() {
441 Query msg = state.makeQuery();
443 assertEquals(MY_HOST, msg.getSource());
447 public void testGetHost() {
448 assertEquals(MY_HOST, state.getHost());
452 public void testGetTopic() {
453 assertEquals(MY_TOPIC, state.getTopic());
457 * State used for testing purposes, with abstract methods implemented.
459 private class MyState extends State {
461 public MyState(PoolingManager mgr) {