Fix checkstyle for features submodules.
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / state / BasicStateTester.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling.state;
22
23 import static org.junit.Assert.assertNotNull;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyLong;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.when;
30
31 import java.util.Arrays;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.SortedSet;
36 import java.util.TreeSet;
37 import java.util.concurrent.atomic.AtomicReference;
38 import org.onap.policy.drools.pooling.CancellableScheduledTask;
39 import org.onap.policy.drools.pooling.PoolingManager;
40 import org.onap.policy.drools.pooling.PoolingProperties;
41 import org.onap.policy.drools.pooling.message.BucketAssignments;
42 import org.onap.policy.drools.pooling.message.Leader;
43 import org.onap.policy.drools.pooling.message.Message;
44 import org.onap.policy.drools.utils.Pair;
45 import org.onap.policy.drools.utils.Triple;
46
47 /**
48  * Superclass used to test subclasses of {@link State}.
49  */
50 public class BasicStateTester {
51
52     protected static final long STD_HEARTBEAT_WAIT_MS = 10;
53     protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
54     protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
55     protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
56     protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
57
58     protected static final String MY_TOPIC = "myTopic";
59
60     protected static final String PREV_HOST = "prevHost";
61     protected static final String PREV_HOST2 = PREV_HOST + "A";
62
63     // this follows PREV_HOST, alphabetically
64     protected static final String MY_HOST = PREV_HOST + "X";
65
66     // these follow MY_HOST, alphabetically
67     protected static final String HOST1 = MY_HOST + "1";
68     protected static final String HOST2 = MY_HOST + "2";
69     protected static final String HOST3 = MY_HOST + "3";
70     protected static final String HOST4 = MY_HOST + "4";
71
72     protected static final String LEADER = HOST1;
73
74     protected static final String[] HOST_ARR3 = {HOST1, MY_HOST, HOST2};
75
76     protected static final BucketAssignments EMPTY_ASGN = new BucketAssignments();
77     protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3);
78
79     /**
80      * Scheduled tasks returned by schedule().
81      */
82     protected LinkedList<CancellableScheduledTask> onceSchedules;
83
84     /**
85      * Tasks captured via schedule().
86      */
87     protected LinkedList<Pair<Long, StateTimerTask>> onceTasks;
88
89     /**
90      * Scheduled tasks returned by scheduleWithFixedDelay().
91      */
92     protected LinkedList<CancellableScheduledTask> repeatedSchedules;
93
94     /**
95      * Tasks captured via scheduleWithFixedDelay().
96      */
97     protected LinkedList<Triple<Long, Long, StateTimerTask>> repeatedTasks;
98
99     /**
100      * Messages captured via publish().
101      */
102     protected LinkedList<Pair<String, Message>> published;
103
104     /**
105      * Messages captured via publishAdmin().
106      */
107     protected LinkedList<Message> admin;
108
109     protected PoolingManager mgr;
110     protected PoolingProperties props;
111     protected State prevState;
112
113     public BasicStateTester() {
114         super();
115     }
116
117     /**
118      * Setup.
119      * 
120      * @throws Exception throws exception
121      */
122     public void setUp() throws Exception {
123         onceSchedules = new LinkedList<>();
124         onceTasks = new LinkedList<>();
125
126         repeatedSchedules = new LinkedList<>();
127         repeatedTasks = new LinkedList<>();
128
129         published = new LinkedList<>();
130         admin = new LinkedList<>();
131
132         mgr = mock(PoolingManager.class);
133         props = mock(PoolingProperties.class);
134
135         when(mgr.getHost()).thenReturn(MY_HOST);
136         when(mgr.getTopic()).thenReturn(MY_TOPIC);
137         when(mgr.getProperties()).thenReturn(props);
138
139         when(props.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
140         when(props.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
141         when(props.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
142         when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
143         when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
144
145         prevState = new State(mgr) {
146             @Override
147             public Map<String, Object> getFilter() {
148                 throw new UnsupportedOperationException("cannot filter");
149             }
150         };
151
152         // capture publish() arguments
153         doAnswer(invocation -> {
154             Object[] args = invocation.getArguments();
155             published.add(new Pair<>((String) args[0], (Message) args[1]));
156
157             return null;
158         }).when(mgr).publish(anyString(), any(Message.class));
159
160         // capture publishAdmin() arguments
161         doAnswer(invocation -> {
162             Object[] args = invocation.getArguments();
163             admin.add((Message) args[0]);
164
165             return null;
166         }).when(mgr).publishAdmin(any(Message.class));
167
168         // capture schedule() arguments, and return a new future
169         when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
170             Object[] args = invocation.getArguments();
171             onceTasks.add(new Pair<>((Long) args[0], (StateTimerTask) args[1]));
172
173             CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
174             onceSchedules.add(sched);
175             return sched;
176         });
177
178         // capture scheduleWithFixedDelay() arguments, and return a new future
179         when(mgr.scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
180             Object[] args = invocation.getArguments();
181             repeatedTasks.add(new Triple<>((Long) args[0], (Long) args[1], (StateTimerTask) args[2]));
182
183             CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
184             repeatedSchedules.add(sched);
185             return sched;
186         });
187
188         // get/set assignments in the manager
189         AtomicReference<BucketAssignments> asgn = new AtomicReference<>(ASGN3);
190
191         when(mgr.getAssignments()).thenAnswer(args -> asgn.get());
192
193         doAnswer(args -> {
194             asgn.set(args.getArgument(0));
195             return null;
196         }).when(mgr).startDistributing(any());
197     }
198
199     /**
200      * Makes a sorted set of hosts.
201      * 
202      * @param hosts the hosts to be sorted
203      * @return the set of hosts, sorted
204      */
205     protected SortedSet<String> sortHosts(String... hosts) {
206         return new TreeSet<>(Arrays.asList(hosts));
207     }
208
209     /**
210      * Captures the host array from the Leader message published to the admin channel.
211      * 
212      * @return the host array, as a list
213      */
214     protected List<String> captureHostList() {
215         return Arrays.asList(captureHostArray());
216     }
217
218     /**
219      * Captures the host array from the Leader message published to the admin channel.
220      * 
221      * @return the host array
222      */
223     protected String[] captureHostArray() {
224         BucketAssignments asgn = captureAssignments();
225
226         String[] arr = asgn.getHostArray();
227         assertNotNull(arr);
228
229         return arr;
230     }
231
232     /**
233      * Captures the assignments from the Leader message published to the admin channel.
234      * 
235      * @return the bucket assignments
236      */
237     protected BucketAssignments captureAssignments() {
238         Leader msg = captureAdminMessage(Leader.class);
239
240         BucketAssignments asgn = msg.getAssignments();
241         assertNotNull(asgn);
242         return asgn;
243     }
244
245     /**
246      * Captures the message published to the admin channel.
247      * 
248      * @param clazz type of {@link Message} to capture
249      * @return the message that was published
250      */
251     protected <T extends Message> T captureAdminMessage(Class<T> clazz) {
252         return captureAdminMessage(clazz, 0);
253     }
254
255     /**
256      * Captures the message published to the admin channel.
257      * 
258      * @param clazz type of {@link Message} to capture
259      * @param index index of the item to be captured
260      * @return the message that was published
261      */
262     protected <T extends Message> T captureAdminMessage(Class<T> clazz, int index) {
263         return clazz.cast(admin.get(index));
264     }
265
266     /**
267      * Captures the message published to the non-admin channels.
268      * 
269      * @param clazz type of {@link Message} to capture
270      * @return the (channel,message) pair that was published
271      */
272     protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz) {
273         return capturePublishedMessage(clazz, 0);
274     }
275
276     /**
277      * Captures the message published to the non-admin channels.
278      * 
279      * @param clazz type of {@link Message} to capture
280      * @param index index of the item to be captured
281      * @return the (channel,message) pair that was published
282      */
283     protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
284         Pair<String, Message> msg = published.get(index);
285         return new Pair<>(msg.first(), clazz.cast(msg.second()));
286     }
287 }