Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / state / StateTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling.state;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.onap.policy.drools.pooling.CancellableScheduledTask;
37 import org.onap.policy.drools.pooling.PoolingManager;
38 import org.onap.policy.drools.pooling.message.BucketAssignments;
39 import org.onap.policy.drools.pooling.message.Heartbeat;
40 import org.onap.policy.drools.pooling.message.Identification;
41 import org.onap.policy.drools.pooling.message.Leader;
42 import org.onap.policy.drools.pooling.message.Offline;
43 import org.onap.policy.drools.pooling.message.Query;
44
45 public class StateTest extends SupportBasicStateTester {
46
47     private State state;
48
49     /**
50      * Setup.
51      */
52     @Before
53     public void setUp() throws Exception {
54         super.setUp();
55
56         state = new MyState(mgr);
57     }
58
59     @Test
60     public void testStatePoolingManager() {
61         /*
62          * Prove the state is attached to the manager by invoking getHost(), which
63          * delegates to the manager.
64          */
65         assertEquals(MY_HOST, state.getHost());
66     }
67
68     @Test
69     public void testStateState() {
70         // allocate a new state, copying from the old state
71         state = new MyState(mgr);
72
73         /*
74          * Prove the state is attached to the manager by invoking getHost(), which
75          * delegates to the manager.
76          */
77         assertEquals(MY_HOST, state.getHost());
78     }
79
80     @Test
81     public void testCancelTimers() {
82         int delay = 100;
83         int initDelay = 200;
84
85         /*
86          * Create three tasks tasks.
87          */
88
89         StateTimerTask task1 = mock(StateTimerTask.class);
90         StateTimerTask task2 = mock(StateTimerTask.class);
91         StateTimerTask task3 = mock(StateTimerTask.class);
92
93         // two tasks via schedule()
94         state.schedule(delay, task1);
95         state.schedule(delay, task2);
96
97         // one task via scheduleWithFixedDelay()
98         state.scheduleWithFixedDelay(initDelay, delay, task3);
99
100         // ensure all were scheduled, but not yet canceled
101         verify(mgr).schedule(delay, task1);
102         verify(mgr).schedule(delay, task2);
103         verify(mgr).scheduleWithFixedDelay(initDelay, delay, task3);
104
105         CancellableScheduledTask sched1 = onceSchedules.removeFirst();
106         CancellableScheduledTask sched2 = onceSchedules.removeFirst();
107         CancellableScheduledTask sched3 = repeatedSchedules.removeFirst();
108
109         verify(sched1, never()).cancel();
110         verify(sched2, never()).cancel();
111         verify(sched3, never()).cancel();
112
113         /*
114          * Cancel the timers.
115          */
116         state.cancelTimers();
117
118         // verify that all were cancelled
119         verify(sched1).cancel();
120         verify(sched2).cancel();
121         verify(sched3).cancel();
122     }
123
124     @Test
125     public void testStart() {
126         assertThatCode(() -> state.start()).doesNotThrowAnyException();
127     }
128
129     @Test
130     public void testGoStart() {
131         State next = mock(State.class);
132         when(mgr.goStart()).thenReturn(next);
133
134         State next2 = state.goStart();
135         assertEquals(next, next2);
136     }
137
138     @Test
139     public void testGoQuery() {
140         State next = mock(State.class);
141         when(mgr.goQuery()).thenReturn(next);
142
143         State next2 = state.goQuery();
144         assertEquals(next, next2);
145     }
146
147     @Test
148     public void testGoActive_WithAssignment() {
149         State act = mock(State.class);
150         State inact = mock(State.class);
151
152         when(mgr.goActive()).thenReturn(act);
153         when(mgr.goInactive()).thenReturn(inact);
154
155         String[] arr = {HOST2, PREV_HOST, MY_HOST};
156         BucketAssignments asgn = new BucketAssignments(arr);
157
158         assertEquals(act, state.goActive(asgn));
159
160         verify(mgr).startDistributing(asgn);
161     }
162
163     @Test
164     public void testGoActive_WithoutAssignment() {
165         State act = mock(State.class);
166         State inact = mock(State.class);
167
168         when(mgr.goActive()).thenReturn(act);
169         when(mgr.goInactive()).thenReturn(inact);
170
171         String[] arr = {HOST2, PREV_HOST};
172         BucketAssignments asgn = new BucketAssignments(arr);
173
174         assertEquals(inact, state.goActive(asgn));
175
176         verify(mgr).startDistributing(asgn);
177     }
178
179     @Test
180     public void testGoActive_NullAssignment() {
181         State act = mock(State.class);
182         State inact = mock(State.class);
183
184         when(mgr.goActive()).thenReturn(act);
185         when(mgr.goInactive()).thenReturn(inact);
186
187         assertEquals(inact, state.goActive(null));
188
189         verify(mgr, never()).startDistributing(any());
190     }
191
192     @Test
193     public void testGoInactive() {
194         State next = mock(State.class);
195         when(mgr.goInactive()).thenReturn(next);
196
197         State next2 = state.goInactive();
198         assertEquals(next, next2);
199     }
200
201     @Test
202     public void testProcessHeartbeat() {
203         assertNull(state.process(new Heartbeat()));
204     }
205
206     @Test
207     public void testProcessIdentification() {
208         assertNull(state.process(new Identification()));
209     }
210
211     @Test
212     public void testProcessLeader() {
213         String[] arr = {HOST2, HOST1};
214         BucketAssignments asgn = new BucketAssignments(arr);
215         Leader msg = new Leader(HOST1, asgn);
216
217         // should ignore it
218         assertEquals(null, state.process(msg));
219         verify(mgr).startDistributing(asgn);
220     }
221
222     @Test
223     public void testProcessLeader_Invalid() {
224         Leader msg = new Leader(PREV_HOST, null);
225
226         // should stay in the same state, and not start distributing
227         assertNull(state.process(msg));
228         verify(mgr, never()).startDistributing(any());
229         verify(mgr, never()).goActive();
230         verify(mgr, never()).goInactive();
231     }
232
233     @Test
234     public void testIsValidLeader_NullAssignment() {
235         assertFalse(state.isValid(new Leader(PREV_HOST, null)));
236     }
237
238     @Test
239     public void testIsValidLeader_NullSource() {
240         String[] arr = {HOST2, PREV_HOST, MY_HOST};
241         BucketAssignments asgn = new BucketAssignments(arr);
242         assertFalse(state.isValid(new Leader(null, asgn)));
243     }
244
245     @Test
246     public void testIsValidLeader_EmptyAssignment() {
247         assertFalse(state.isValid(new Leader(PREV_HOST, new BucketAssignments())));
248     }
249
250     @Test
251     public void testIsValidLeader_FromSelf() {
252         String[] arr = {HOST2, MY_HOST};
253         BucketAssignments asgn = new BucketAssignments(arr);
254
255         assertFalse(state.isValid(new Leader(MY_HOST, asgn)));
256     }
257
258     @Test
259     public void testIsValidLeader_WrongLeader() {
260         String[] arr = {HOST2, HOST3};
261         BucketAssignments asgn = new BucketAssignments(arr);
262
263         assertFalse(state.isValid(new Leader(HOST1, asgn)));
264     }
265
266     @Test
267     public void testIsValidLeader() {
268         String[] arr = {HOST2, HOST1};
269         BucketAssignments asgn = new BucketAssignments(arr);
270
271         assertTrue(state.isValid(new Leader(HOST1, asgn)));
272     }
273
274     @Test
275     public void testProcessOffline() {
276         assertNull(state.process(new Offline()));
277     }
278
279     @Test
280     public void testProcessQuery() {
281         assertNull(state.process(new Query()));
282     }
283
284     @Test
285     public void testPublishIdentification() {
286         Identification msg = new Identification();
287         state.publish(msg);
288
289         verify(mgr).publishAdmin(msg);
290     }
291
292     @Test
293     public void testPublishLeader() {
294         Leader msg = new Leader();
295         state.publish(msg);
296
297         verify(mgr).publishAdmin(msg);
298     }
299
300     @Test
301     public void testPublishOffline() {
302         Offline msg = new Offline();
303         state.publish(msg);
304
305         verify(mgr).publishAdmin(msg);
306     }
307
308     @Test
309     public void testPublishQuery() {
310         Query msg = new Query();
311         state.publish(msg);
312
313         verify(mgr).publishAdmin(msg);
314     }
315
316     @Test
317     public void testPublishStringHeartbeat() {
318         String chnl = "channelH";
319         Heartbeat msg = new Heartbeat();
320
321         state.publish(chnl, msg);
322
323         verify(mgr).publish(chnl, msg);
324     }
325
326     @Test
327     public void testStartDistributing() {
328         BucketAssignments asgn = new BucketAssignments();
329         state.startDistributing(asgn);
330
331         verify(mgr).startDistributing(asgn);
332     }
333
334     @Test
335     public void testStartDistributing_NullAssignments() {
336         state.startDistributing(null);
337
338         verify(mgr, never()).startDistributing(any());
339     }
340
341     @Test
342     public void testSchedule() {
343         int delay = 100;
344
345         StateTimerTask task = mock(StateTimerTask.class);
346
347         state.schedule(delay, task);
348
349         CancellableScheduledTask sched = onceSchedules.removeFirst();
350
351         // scheduled, but not canceled yet
352         verify(mgr).schedule(delay, task);
353         verify(sched, never()).cancel();
354
355         /*
356          * Ensure the state added the timer to its list by telling it to cancel its timers
357          * and then seeing if this timer was canceled.
358          */
359         state.cancelTimers();
360         verify(sched).cancel();
361     }
362
363     @Test
364     public void testScheduleWithFixedDelay() {
365         int initdel = 100;
366         int delay = 200;
367
368         StateTimerTask task = mock(StateTimerTask.class);
369
370         state.scheduleWithFixedDelay(initdel, delay, task);
371
372         CancellableScheduledTask sched = repeatedSchedules.removeFirst();
373
374         // scheduled, but not canceled yet
375         verify(mgr).scheduleWithFixedDelay(initdel, delay, task);
376         verify(sched, never()).cancel();
377
378         /*
379          * Ensure the state added the timer to its list by telling it to cancel its timers
380          * and then seeing if this timer was canceled.
381          */
382         state.cancelTimers();
383         verify(sched).cancel();
384     }
385
386     @Test
387     public void testMissedHeartbeat() {
388         State next = mock(State.class);
389         when(mgr.goStart()).thenReturn(next);
390
391         State next2 = state.missedHeartbeat();
392         assertEquals(next, next2);
393
394         // should continue to distribute
395         verify(mgr, never()).startDistributing(null);
396
397         Offline msg = captureAdminMessage(Offline.class);
398         assertEquals(MY_HOST, msg.getSource());
399     }
400
401     @Test
402     public void testInternalTopicFailed() {
403         State next = mock(State.class);
404         when(mgr.goInactive()).thenReturn(next);
405
406         State next2 = state.internalTopicFailed();
407         assertEquals(next, next2);
408
409         // should stop distributing
410         verify(mgr).startDistributing(null);
411
412         Offline msg = captureAdminMessage(Offline.class);
413         assertEquals(MY_HOST, msg.getSource());
414     }
415
416     @Test
417     public void testMakeHeartbeat() {
418         long timestamp = 30000L;
419         Heartbeat msg = state.makeHeartbeat(timestamp);
420
421         assertEquals(MY_HOST, msg.getSource());
422         assertEquals(timestamp, msg.getTimestampMs());
423     }
424
425     @Test
426     public void testMakeIdentification() {
427         Identification ident = state.makeIdentification();
428         assertEquals(MY_HOST, ident.getSource());
429         assertEquals(ASGN3, ident.getAssignments());
430     }
431
432     @Test
433     public void testMakeOffline() {
434         Offline msg = state.makeOffline();
435
436         assertEquals(MY_HOST, msg.getSource());
437     }
438
439     @Test
440     public void testMakeQuery() {
441         Query msg = state.makeQuery();
442
443         assertEquals(MY_HOST, msg.getSource());
444     }
445
446     @Test
447     public void testGetHost() {
448         assertEquals(MY_HOST, state.getHost());
449     }
450
451     @Test
452     public void testGetTopic() {
453         assertEquals(MY_TOPIC, state.getTopic());
454     }
455
456     /**
457      * State used for testing purposes, with abstract methods implemented.
458      */
459     private class MyState extends State {
460
461         public MyState(PoolingManager mgr) {
462             super(mgr);
463         }
464     }
465 }