2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling.state;
23 import static org.junit.Assert.assertNotNull;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyLong;
26 import static org.mockito.ArgumentMatchers.anyString;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.when;
31 import java.util.Arrays;
32 import java.util.LinkedList;
33 import java.util.List;
35 import java.util.SortedSet;
36 import java.util.TreeSet;
37 import java.util.concurrent.atomic.AtomicReference;
38 import org.onap.policy.drools.pooling.CancellableScheduledTask;
39 import org.onap.policy.drools.pooling.PoolingManager;
40 import org.onap.policy.drools.pooling.PoolingProperties;
41 import org.onap.policy.drools.pooling.message.BucketAssignments;
42 import org.onap.policy.drools.pooling.message.Leader;
43 import org.onap.policy.drools.pooling.message.Message;
44 import org.onap.policy.drools.utils.Pair;
45 import org.onap.policy.drools.utils.Triple;
48 * Superclass used to test subclasses of {@link State}.
50 public class BasicStateTester {
52 protected static final long STD_HEARTBEAT_WAIT_MS = 10;
53 protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
54 protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
55 protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
56 protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
58 protected static final String MY_TOPIC = "myTopic";
60 protected static final String PREV_HOST = "prevHost";
61 protected static final String PREV_HOST2 = PREV_HOST + "A";
63 // this follows PREV_HOST, alphabetically
64 protected static final String MY_HOST = PREV_HOST + "X";
66 // these follow MY_HOST, alphabetically
67 protected static final String HOST1 = MY_HOST + "1";
68 protected static final String HOST2 = MY_HOST + "2";
69 protected static final String HOST3 = MY_HOST + "3";
70 protected static final String HOST4 = MY_HOST + "4";
72 protected static final String LEADER = HOST1;
74 protected static final String[] HOST_ARR3 = {HOST1, MY_HOST, HOST2};
76 protected static final BucketAssignments EMPTY_ASGN = new BucketAssignments();
77 protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3);
80 * Scheduled tasks returned by schedule().
82 protected LinkedList<CancellableScheduledTask> onceSchedules;
85 * Tasks captured via schedule().
87 protected LinkedList<Pair<Long, StateTimerTask>> onceTasks;
90 * Scheduled tasks returned by scheduleWithFixedDelay().
92 protected LinkedList<CancellableScheduledTask> repeatedSchedules;
95 * Tasks captured via scheduleWithFixedDelay().
97 protected LinkedList<Triple<Long, Long, StateTimerTask>> repeatedTasks;
100 * Messages captured via publish().
102 protected LinkedList<Pair<String, Message>> published;
105 * Messages captured via publishAdmin().
107 protected LinkedList<Message> admin;
109 protected PoolingManager mgr;
110 protected PoolingProperties props;
111 protected State prevState;
113 public BasicStateTester() {
120 * @throws Exception throws exception
122 public void setUp() throws Exception {
123 onceSchedules = new LinkedList<>();
124 onceTasks = new LinkedList<>();
126 repeatedSchedules = new LinkedList<>();
127 repeatedTasks = new LinkedList<>();
129 published = new LinkedList<>();
130 admin = new LinkedList<>();
132 mgr = mock(PoolingManager.class);
133 props = mock(PoolingProperties.class);
135 when(mgr.getHost()).thenReturn(MY_HOST);
136 when(mgr.getTopic()).thenReturn(MY_TOPIC);
137 when(mgr.getProperties()).thenReturn(props);
139 when(props.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
140 when(props.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
141 when(props.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
142 when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
143 when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
145 prevState = new State(mgr) {
147 public Map<String, Object> getFilter() {
148 throw new UnsupportedOperationException("cannot filter");
152 // capture publish() arguments
153 doAnswer(invocation -> {
154 Object[] args = invocation.getArguments();
155 published.add(new Pair<>((String) args[0], (Message) args[1]));
158 }).when(mgr).publish(anyString(), any(Message.class));
160 // capture publishAdmin() arguments
161 doAnswer(invocation -> {
162 Object[] args = invocation.getArguments();
163 admin.add((Message) args[0]);
166 }).when(mgr).publishAdmin(any(Message.class));
168 // capture schedule() arguments, and return a new future
169 when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
170 Object[] args = invocation.getArguments();
171 onceTasks.add(new Pair<>((Long) args[0], (StateTimerTask) args[1]));
173 CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
174 onceSchedules.add(sched);
178 // capture scheduleWithFixedDelay() arguments, and return a new future
179 when(mgr.scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
180 Object[] args = invocation.getArguments();
181 repeatedTasks.add(new Triple<>((Long) args[0], (Long) args[1], (StateTimerTask) args[2]));
183 CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
184 repeatedSchedules.add(sched);
188 // get/set assignments in the manager
189 AtomicReference<BucketAssignments> asgn = new AtomicReference<>(ASGN3);
191 when(mgr.getAssignments()).thenAnswer(args -> asgn.get());
194 asgn.set(args.getArgument(0));
196 }).when(mgr).startDistributing(any());
200 * Makes a sorted set of hosts.
202 * @param hosts the hosts to be sorted
203 * @return the set of hosts, sorted
205 protected SortedSet<String> sortHosts(String... hosts) {
206 return new TreeSet<>(Arrays.asList(hosts));
210 * Captures the host array from the Leader message published to the admin channel.
212 * @return the host array, as a list
214 protected List<String> captureHostList() {
215 return Arrays.asList(captureHostArray());
219 * Captures the host array from the Leader message published to the admin channel.
221 * @return the host array
223 protected String[] captureHostArray() {
224 BucketAssignments asgn = captureAssignments();
226 String[] arr = asgn.getHostArray();
233 * Captures the assignments from the Leader message published to the admin channel.
235 * @return the bucket assignments
237 protected BucketAssignments captureAssignments() {
238 Leader msg = captureAdminMessage(Leader.class);
240 BucketAssignments asgn = msg.getAssignments();
246 * Captures the message published to the admin channel.
248 * @param clazz type of {@link Message} to capture
249 * @return the message that was published
251 protected <T extends Message> T captureAdminMessage(Class<T> clazz) {
252 return captureAdminMessage(clazz, 0);
256 * Captures the message published to the admin channel.
258 * @param clazz type of {@link Message} to capture
259 * @param index index of the item to be captured
260 * @return the message that was published
262 protected <T extends Message> T captureAdminMessage(Class<T> clazz, int index) {
263 return clazz.cast(admin.get(index));
267 * Captures the message published to the non-admin channels.
269 * @param clazz type of {@link Message} to capture
270 * @return the (channel,message) pair that was published
272 protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz) {
273 return capturePublishedMessage(clazz, 0);
277 * Captures the message published to the non-admin channels.
279 * @param clazz type of {@link Message} to capture
280 * @param index index of the item to be captured
281 * @return the (channel,message) pair that was published
283 protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
284 Pair<String, Message> msg = published.get(index);
285 return new Pair<>(msg.first(), clazz.cast(msg.second()));