Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / test / java / org / onap / policy / drools / pooling / state / StateTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling.state;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.junit.jupiter.api.Assertions.assertEquals;
26 import static org.junit.jupiter.api.Assertions.assertFalse;
27 import static org.junit.jupiter.api.Assertions.assertNull;
28 import static org.junit.jupiter.api.Assertions.assertTrue;
29 import static org.mockito.ArgumentMatchers.any;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.never;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import org.junit.jupiter.api.BeforeEach;
36 import org.junit.jupiter.api.Test;
37 import org.onap.policy.drools.pooling.CancellableScheduledTask;
38 import org.onap.policy.drools.pooling.PoolingManager;
39 import org.onap.policy.drools.pooling.message.BucketAssignments;
40 import org.onap.policy.drools.pooling.message.Heartbeat;
41 import org.onap.policy.drools.pooling.message.Identification;
42 import org.onap.policy.drools.pooling.message.Leader;
43 import org.onap.policy.drools.pooling.message.Offline;
44 import org.onap.policy.drools.pooling.message.Query;
45
46 class StateTest extends SupportBasicStateTester {
47
48     private State state;
49
50     /**
51      * Setup.
52      */
53     @BeforeEach
54     public void setUp() throws Exception {
55         super.setUp();
56
57         state = new MyState(mgr);
58     }
59
60     @Test
61     void testStatePoolingManager() {
62         /*
63          * Prove the state is attached to the manager by invoking getHost(), which
64          * delegates to the manager.
65          */
66         assertEquals(MY_HOST, state.getHost());
67     }
68
69     @Test
70     void testStateState() {
71         // allocate a new state, copying from the old state
72         state = new MyState(mgr);
73
74         /*
75          * Prove the state is attached to the manager by invoking getHost(), which
76          * delegates to the manager.
77          */
78         assertEquals(MY_HOST, state.getHost());
79     }
80
81     @Test
82     void testCancelTimers() {
83         int delay = 100;
84         int initDelay = 200;
85
86         /*
87          * Create three tasks tasks.
88          */
89
90         StateTimerTask task1 = mock(StateTimerTask.class);
91         StateTimerTask task2 = mock(StateTimerTask.class);
92         StateTimerTask task3 = mock(StateTimerTask.class);
93
94         // two tasks via schedule()
95         state.schedule(delay, task1);
96         state.schedule(delay, task2);
97
98         // one task via scheduleWithFixedDelay()
99         state.scheduleWithFixedDelay(initDelay, delay, task3);
100
101         // ensure all were scheduled, but not yet canceled
102         verify(mgr).schedule(delay, task1);
103         verify(mgr).schedule(delay, task2);
104         verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3);
105
106         CancellableScheduledTask sched1 = onceSchedules.removeFirst();
107         CancellableScheduledTask sched2 = onceSchedules.removeFirst();
108         CancellableScheduledTask sched3 = repeatedSchedules.removeFirst();
109
110         verify(sched1, never()).cancel();
111         verify(sched2, never()).cancel();
112         verify(sched3, never()).cancel();
113
114         /*
115          * Cancel the timers.
116          */
117         state.cancelTimers();
118
119         // verify that all were cancelled
120         verify(sched1).cancel();
121         verify(sched2).cancel();
122         verify(sched3).cancel();
123     }
124
125     @Test
126     void testStart() {
127         assertThatCode(() -> state.start()).doesNotThrowAnyException();
128     }
129
130     @Test
131     void testGoStart() {
132         State next = mock(State.class);
133         when(mgr.goStart()).thenReturn(next);
134
135         State next2 = state.goStart();
136         assertEquals(next, next2);
137     }
138
139     @Test
140     void testGoQuery() {
141         State next = mock(State.class);
142         when(mgr.goQuery()).thenReturn(next);
143
144         State next2 = state.goQuery();
145         assertEquals(next, next2);
146     }
147
148     @Test
149     void testGoActive_WithAssignment() {
150         State act = mock(State.class);
151         State inact = mock(State.class);
152
153         when(mgr.goActive()).thenReturn(act);
154         when(mgr.goInactive()).thenReturn(inact);
155
156         String[] arr = {HOST2, PREV_HOST, MY_HOST};
157         BucketAssignments asgn = new BucketAssignments(arr);
158
159         assertEquals(act, state.goActive(asgn));
160
161         verify(mgr).startDistributing(asgn);
162     }
163
164     @Test
165     void testGoActive_WithoutAssignment() {
166         State act = mock(State.class);
167         State inact = mock(State.class);
168
169         when(mgr.goActive()).thenReturn(act);
170         when(mgr.goInactive()).thenReturn(inact);
171
172         String[] arr = {HOST2, PREV_HOST};
173         BucketAssignments asgn = new BucketAssignments(arr);
174
175         assertEquals(inact, state.goActive(asgn));
176
177         verify(mgr).startDistributing(asgn);
178     }
179
180     @Test
181     void testGoActive_NullAssignment() {
182         State act = mock(State.class);
183         State inact = mock(State.class);
184
185         when(mgr.goActive()).thenReturn(act);
186         when(mgr.goInactive()).thenReturn(inact);
187
188         assertEquals(inact, state.goActive(null));
189
190         verify(mgr, never()).startDistributing(any());
191     }
192
193     @Test
194     void testGoInactive() {
195         State next = mock(State.class);
196         when(mgr.goInactive()).thenReturn(next);
197
198         State next2 = state.goInactive();
199         assertEquals(next, next2);
200     }
201
202     @Test
203     void testProcessHeartbeat() {
204         assertNull(state.process(new Heartbeat()));
205     }
206
207     @Test
208     void testProcessIdentification() {
209         assertNull(state.process(new Identification()));
210     }
211
212     @Test
213     void testProcessLeader() {
214         String[] arr = {HOST2, HOST1};
215         BucketAssignments asgn = new BucketAssignments(arr);
216         Leader msg = new Leader(HOST1, asgn);
217
218         // should ignore it
219         assertNull(state.process(msg));
220         verify(mgr).startDistributing(asgn);
221     }
222
223     @Test
224     void testProcessLeader_Invalid() {
225         Leader msg = new Leader(PREV_HOST, null);
226
227         // should stay in the same state, and not start distributing
228         assertNull(state.process(msg));
229         verify(mgr, never()).startDistributing(any());
230         verify(mgr, never()).goActive();
231         verify(mgr, never()).goInactive();
232     }
233
234     @Test
235     void testIsValidLeader_NullAssignment() {
236         assertFalse(state.isValid(new Leader(PREV_HOST, null)));
237     }
238
239     @Test
240     void testIsValidLeader_NullSource() {
241         String[] arr = {HOST2, PREV_HOST, MY_HOST};
242         BucketAssignments asgn = new BucketAssignments(arr);
243         assertFalse(state.isValid(new Leader(null, asgn)));
244     }
245
246     @Test
247     void testIsValidLeader_EmptyAssignment() {
248         assertFalse(state.isValid(new Leader(PREV_HOST, new BucketAssignments())));
249     }
250
251     @Test
252     void testIsValidLeader_FromSelf() {
253         String[] arr = {HOST2, MY_HOST};
254         BucketAssignments asgn = new BucketAssignments(arr);
255
256         assertFalse(state.isValid(new Leader(MY_HOST, asgn)));
257     }
258
259     @Test
260     void testIsValidLeader_WrongLeader() {
261         String[] arr = {HOST2, HOST3};
262         BucketAssignments asgn = new BucketAssignments(arr);
263
264         assertFalse(state.isValid(new Leader(HOST1, asgn)));
265     }
266
267     @Test
268     void testIsValidLeader() {
269         String[] arr = {HOST2, HOST1};
270         BucketAssignments asgn = new BucketAssignments(arr);
271
272         assertTrue(state.isValid(new Leader(HOST1, asgn)));
273     }
274
275     @Test
276     void testProcessOffline() {
277         assertNull(state.process(new Offline()));
278     }
279
280     @Test
281     void testProcessQuery() {
282         assertNull(state.process(new Query()));
283     }
284
285     @Test
286     void testPublishIdentification() {
287         Identification msg = new Identification();
288         state.publish(msg);
289
290         verify(mgr).publishAdmin(msg);
291     }
292
293     @Test
294     void testPublishLeader() {
295         Leader msg = new Leader();
296         state.publish(msg);
297
298         verify(mgr).publishAdmin(msg);
299     }
300
301     @Test
302     void testPublishOffline() {
303         Offline msg = new Offline();
304         state.publish(msg);
305
306         verify(mgr).publishAdmin(msg);
307     }
308
309     @Test
310     void testPublishQuery() {
311         Query msg = new Query();
312         state.publish(msg);
313
314         verify(mgr).publishAdmin(msg);
315     }
316
317     @Test
318     void testPublishStringHeartbeat() {
319         String chnl = "channelH";
320         Heartbeat msg = new Heartbeat();
321
322         state.publish(chnl, msg);
323
324         verify(mgr).publish(chnl, msg);
325     }
326
327     @Test
328     void testStartDistributing() {
329         BucketAssignments asgn = new BucketAssignments();
330         state.startDistributing(asgn);
331
332         verify(mgr).startDistributing(asgn);
333     }
334
335     @Test
336     void testStartDistributing_NullAssignments() {
337         state.startDistributing(null);
338
339         verify(mgr, never()).startDistributing(any());
340     }
341
342     @Test
343     void testSchedule() {
344         int delay = 100;
345
346         StateTimerTask task = mock(StateTimerTask.class);
347
348         state.schedule(delay, task);
349
350         CancellableScheduledTask sched = onceSchedules.removeFirst();
351
352         // scheduled, but not canceled yet
353         verify(mgr).schedule(delay, task);
354         verify(sched, never()).cancel();
355
356         /*
357          * Ensure the state added the timer to its list by telling it to cancel its timers
358          * and then seeing if this timer was canceled.
359          */
360         state.cancelTimers();
361         verify(sched).cancel();
362     }
363
364     @Test
365     void testScheduleWithFixedDelay() {
366         int initdel = 100;
367         int delay = 200;
368
369         StateTimerTask task = mock(StateTimerTask.class);
370
371         state.scheduleWithFixedDelay(initdel, delay, task);
372
373         CancellableScheduledTask sched = repeatedSchedules.removeFirst();
374
375         // scheduled, but not canceled yet
376         verify(mgr).scheduleWithFixedDelay(initdel, delay, task);
377         verify(sched, never()).cancel();
378
379         /*
380          * Ensure the state added the timer to its list by telling it to cancel its timers
381          * and then seeing if this timer was canceled.
382          */
383         state.cancelTimers();
384         verify(sched).cancel();
385     }
386
387     @Test
388     void testMissedHeartbeat() {
389         State next = mock(State.class);
390         when(mgr.goStart()).thenReturn(next);
391
392         State next2 = state.missedHeartbeat();
393         assertEquals(next, next2);
394
395         // should continue to distribute
396         verify(mgr, never()).startDistributing(null);
397
398         Offline msg = captureAdminMessage(Offline.class);
399         assertEquals(MY_HOST, msg.getSource());
400     }
401
402     @Test
403     void testInternalTopicFailed() {
404         State next = mock(State.class);
405         when(mgr.goInactive()).thenReturn(next);
406
407         State next2 = state.internalTopicFailed();
408         assertEquals(next, next2);
409
410         // should stop distributing
411         verify(mgr).startDistributing(null);
412
413         Offline msg = captureAdminMessage(Offline.class);
414         assertEquals(MY_HOST, msg.getSource());
415     }
416
417     @Test
418     void testMakeHeartbeat() {
419         long timestamp = 30000L;
420         Heartbeat msg = state.makeHeartbeat(timestamp);
421
422         assertEquals(MY_HOST, msg.getSource());
423         assertEquals(timestamp, msg.getTimestampMs());
424     }
425
426     @Test
427     void testMakeIdentification() {
428         Identification ident = state.makeIdentification();
429         assertEquals(MY_HOST, ident.getSource());
430         assertEquals(ASGN3, ident.getAssignments());
431     }
432
433     @Test
434     void testMakeOffline() {
435         Offline msg = state.makeOffline();
436
437         assertEquals(MY_HOST, msg.getSource());
438     }
439
440     @Test
441     void testMakeQuery() {
442         Query msg = state.makeQuery();
443
444         assertEquals(MY_HOST, msg.getSource());
445     }
446
447     @Test
448     void testGetHost() {
449         assertEquals(MY_HOST, state.getHost());
450     }
451
452     @Test
453     void testGetTopic() {
454         assertEquals(MY_TOPIC, state.getTopic());
455     }
456
457     /**
458      * State used for testing purposes, with abstract methods implemented.
459      */
460     private static class MyState extends State {
461
462         public MyState(PoolingManager mgr) {
463             super(mgr);
464         }
465     }
466 }