Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / test / java / org / onap / policy / drools / pooling / state / SupportBasicStateTester.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2020, 2024 Nordix Foundation
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling.state;
23
24 import static org.junit.jupiter.api.Assertions.assertNotNull;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyLong;
27 import static org.mockito.ArgumentMatchers.anyString;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
31
32 import java.util.Arrays;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.SortedSet;
36 import java.util.TreeSet;
37 import java.util.concurrent.atomic.AtomicReference;
38 import org.apache.commons.lang3.tuple.Pair;
39 import org.apache.commons.lang3.tuple.Triple;
40 import org.onap.policy.drools.pooling.CancellableScheduledTask;
41 import org.onap.policy.drools.pooling.PoolingManager;
42 import org.onap.policy.drools.pooling.PoolingProperties;
43 import org.onap.policy.drools.pooling.message.BucketAssignments;
44 import org.onap.policy.drools.pooling.message.Leader;
45 import org.onap.policy.drools.pooling.message.Message;
46
47 /**
48  * Superclass used to test subclasses of {@link State}.
49  */
50 public class SupportBasicStateTester {
51
52     protected static final long STD_HEARTBEAT_WAIT_MS = 10;
53     protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
54     protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
55     protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
56     protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
57
58     protected static final String MY_TOPIC = "myTopic";
59
60     protected static final String PREV_HOST = "prevHost";
61     protected static final String PREV_HOST2 = PREV_HOST + "A";
62
63     // this follows PREV_HOST, alphabetically
64     protected static final String MY_HOST = PREV_HOST + "X";
65
66     // these follow MY_HOST, alphabetically
67     protected static final String HOST1 = MY_HOST + "1";
68     protected static final String HOST2 = MY_HOST + "2";
69     protected static final String HOST3 = MY_HOST + "3";
70     protected static final String HOST4 = MY_HOST + "4";
71
72     protected static final String LEADER = HOST1;
73
74     protected static final String[] HOST_ARR3 = {HOST1, MY_HOST, HOST2};
75
76     protected static final BucketAssignments EMPTY_ASGN = new BucketAssignments();
77     protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3);
78
79     /**
80      * Scheduled tasks returned by schedule().
81      */
82     protected LinkedList<CancellableScheduledTask> onceSchedules;
83
84     /**
85      * Tasks captured via schedule().
86      */
87     protected LinkedList<Pair<Long, StateTimerTask>> onceTasks;
88
89     /**
90      * Scheduled tasks returned by scheduleWithFixedDelay().
91      */
92     protected LinkedList<CancellableScheduledTask> repeatedSchedules;
93
94     /**
95      * Tasks captured via scheduleWithFixedDelay().
96      */
97     protected LinkedList<Triple<Long, Long, StateTimerTask>> repeatedTasks;
98
99     /**
100      * Messages captured via publish().
101      */
102     protected LinkedList<Pair<String, Message>> published;
103
104     /**
105      * Messages captured via publishAdmin().
106      */
107     protected LinkedList<Message> admin;
108
109     protected PoolingManager mgr;
110     protected PoolingProperties props;
111     protected State prevState;
112
113     public SupportBasicStateTester() {
114         super();
115     }
116
117     /**
118      * Setup.
119      *
120      * @throws Exception throws exception
121      */
122     public void setUp() throws Exception {
123         onceSchedules = new LinkedList<>();
124         onceTasks = new LinkedList<>();
125
126         repeatedSchedules = new LinkedList<>();
127         repeatedTasks = new LinkedList<>();
128
129         published = new LinkedList<>();
130         admin = new LinkedList<>();
131
132         mgr = mock(PoolingManager.class);
133         props = mock(PoolingProperties.class);
134
135         when(mgr.getHost()).thenReturn(MY_HOST);
136         when(mgr.getTopic()).thenReturn(MY_TOPIC);
137         when(mgr.getProperties()).thenReturn(props);
138
139         when(props.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
140         when(props.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
141         when(props.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
142         when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
143         when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
144
145         prevState = new State(mgr) {};
146
147         // capture publish() arguments
148         doAnswer(invocation -> {
149             Object[] args = invocation.getArguments();
150             published.add(Pair.of((String) args[0], (Message) args[1]));
151
152             return null;
153         }).when(mgr).publish(anyString(), any(Message.class));
154
155         // capture publishAdmin() arguments
156         doAnswer(invocation -> {
157             Object[] args = invocation.getArguments();
158             admin.add((Message) args[0]);
159
160             return null;
161         }).when(mgr).publishAdmin(any(Message.class));
162
163         // capture schedule() arguments, and return a new future
164         when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
165             Object[] args = invocation.getArguments();
166             onceTasks.add(Pair.of((Long) args[0], (StateTimerTask) args[1]));
167
168             CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
169             onceSchedules.add(sched);
170             return sched;
171         });
172
173         // capture scheduleWithFixedDelay() arguments, and return a new future
174         when(mgr.scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
175             Object[] args = invocation.getArguments();
176             repeatedTasks.add(Triple.of((Long) args[0], (Long) args[1], (StateTimerTask) args[2]));
177
178             CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
179             repeatedSchedules.add(sched);
180             return sched;
181         });
182
183         // get/set assignments in the manager
184         AtomicReference<BucketAssignments> asgn = new AtomicReference<>(ASGN3);
185
186         when(mgr.getAssignments()).thenAnswer(args -> asgn.get());
187
188         doAnswer(args -> {
189             asgn.set(args.getArgument(0));
190             return null;
191         }).when(mgr).startDistributing(any());
192     }
193
194     /**
195      * Makes a sorted set of hosts.
196      *
197      * @param hosts the hosts to be sorted
198      * @return the set of hosts, sorted
199      */
200     protected SortedSet<String> sortHosts(String... hosts) {
201         return new TreeSet<>(Arrays.asList(hosts));
202     }
203
204     /**
205      * Captures the host array from the Leader message published to the admin channel.
206      *
207      * @return the host array, as a list
208      */
209     protected List<String> captureHostList() {
210         return Arrays.asList(captureHostArray());
211     }
212
213     /**
214      * Captures the host array from the Leader message published to the admin channel.
215      *
216      * @return the host array
217      */
218     protected String[] captureHostArray() {
219         BucketAssignments asgn = captureAssignments();
220
221         String[] arr = asgn.getHostArray();
222         assertNotNull(arr);
223
224         return arr;
225     }
226
227     /**
228      * Captures the assignments from the Leader message published to the admin channel.
229      *
230      * @return the bucket assignments
231      */
232     protected BucketAssignments captureAssignments() {
233         Leader msg = captureAdminMessage(Leader.class);
234
235         BucketAssignments asgn = msg.getAssignments();
236         assertNotNull(asgn);
237         return asgn;
238     }
239
240     /**
241      * Captures the message published to the admin channel.
242      *
243      * @param clazz type of {@link Message} to capture
244      * @return the message that was published
245      */
246     protected <T extends Message> T captureAdminMessage(Class<T> clazz) {
247         return captureAdminMessage(clazz, 0);
248     }
249
250     /**
251      * Captures the message published to the admin channel.
252      *
253      * @param clazz type of {@link Message} to capture
254      * @param index index of the item to be captured
255      * @return the message that was published
256      */
257     protected <T extends Message> T captureAdminMessage(Class<T> clazz, int index) {
258         return clazz.cast(admin.get(index));
259     }
260
261     /**
262      * Captures the message published to the non-admin channels.
263      *
264      * @param clazz type of {@link Message} to capture
265      * @return the (channel,message) pair that was published
266      */
267     protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz) {
268         return capturePublishedMessage(clazz, 0);
269     }
270
271     /**
272      * Captures the message published to the non-admin channels.
273      *
274      * @param clazz type of {@link Message} to capture
275      * @param index index of the item to be captured
276      * @return the (channel,message) pair that was published
277      */
278     protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
279         Pair<String, Message> msg = published.get(index);
280         return Pair.of(msg.getLeft(), clazz.cast(msg.getRight()));
281     }
282 }