2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2020 Nordix Foundation
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling.state;
24 import static org.junit.Assert.assertNotNull;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyLong;
27 import static org.mockito.ArgumentMatchers.anyString;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
32 import java.util.Arrays;
33 import java.util.LinkedList;
34 import java.util.List;
36 import java.util.SortedSet;
37 import java.util.TreeSet;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.apache.commons.lang3.tuple.Pair;
40 import org.onap.policy.drools.pooling.CancellableScheduledTask;
41 import org.onap.policy.drools.pooling.PoolingManager;
42 import org.onap.policy.drools.pooling.PoolingProperties;
43 import org.onap.policy.drools.pooling.message.BucketAssignments;
44 import org.onap.policy.drools.pooling.message.Leader;
45 import org.onap.policy.drools.pooling.message.Message;
46 import org.onap.policy.drools.utils.Triple;
49 * Superclass used to test subclasses of {@link State}.
51 public class SupportBasicStateTester {
53 protected static final long STD_HEARTBEAT_WAIT_MS = 10;
54 protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
55 protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
56 protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
57 protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
59 protected static final String MY_TOPIC = "myTopic";
61 protected static final String PREV_HOST = "prevHost";
62 protected static final String PREV_HOST2 = PREV_HOST + "A";
64 // this follows PREV_HOST, alphabetically
65 protected static final String MY_HOST = PREV_HOST + "X";
67 // these follow MY_HOST, alphabetically
68 protected static final String HOST1 = MY_HOST + "1";
69 protected static final String HOST2 = MY_HOST + "2";
70 protected static final String HOST3 = MY_HOST + "3";
71 protected static final String HOST4 = MY_HOST + "4";
73 protected static final String LEADER = HOST1;
75 protected static final String[] HOST_ARR3 = {HOST1, MY_HOST, HOST2};
77 protected static final BucketAssignments EMPTY_ASGN = new BucketAssignments();
78 protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3);
81 * Scheduled tasks returned by schedule().
83 protected LinkedList<CancellableScheduledTask> onceSchedules;
86 * Tasks captured via schedule().
88 protected LinkedList<Pair<Long, StateTimerTask>> onceTasks;
91 * Scheduled tasks returned by scheduleWithFixedDelay().
93 protected LinkedList<CancellableScheduledTask> repeatedSchedules;
96 * Tasks captured via scheduleWithFixedDelay().
98 protected LinkedList<Triple<Long, Long, StateTimerTask>> repeatedTasks;
101 * Messages captured via publish().
103 protected LinkedList<Pair<String, Message>> published;
106 * Messages captured via publishAdmin().
108 protected LinkedList<Message> admin;
110 protected PoolingManager mgr;
111 protected PoolingProperties props;
112 protected State prevState;
114 public SupportBasicStateTester() {
121 * @throws Exception throws exception
123 public void setUp() throws Exception {
124 onceSchedules = new LinkedList<>();
125 onceTasks = new LinkedList<>();
127 repeatedSchedules = new LinkedList<>();
128 repeatedTasks = new LinkedList<>();
130 published = new LinkedList<>();
131 admin = new LinkedList<>();
133 mgr = mock(PoolingManager.class);
134 props = mock(PoolingProperties.class);
136 when(mgr.getHost()).thenReturn(MY_HOST);
137 when(mgr.getTopic()).thenReturn(MY_TOPIC);
138 when(mgr.getProperties()).thenReturn(props);
140 when(props.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
141 when(props.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
142 when(props.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
143 when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
144 when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
146 prevState = new State(mgr) {
148 public Map<String, Object> getFilter() {
149 throw new UnsupportedOperationException("cannot filter");
153 // capture publish() arguments
154 doAnswer(invocation -> {
155 Object[] args = invocation.getArguments();
156 published.add(Pair.of((String) args[0], (Message) args[1]));
159 }).when(mgr).publish(anyString(), any(Message.class));
161 // capture publishAdmin() arguments
162 doAnswer(invocation -> {
163 Object[] args = invocation.getArguments();
164 admin.add((Message) args[0]);
167 }).when(mgr).publishAdmin(any(Message.class));
169 // capture schedule() arguments, and return a new future
170 when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
171 Object[] args = invocation.getArguments();
172 onceTasks.add(Pair.of((Long) args[0], (StateTimerTask) args[1]));
174 CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
175 onceSchedules.add(sched);
179 // capture scheduleWithFixedDelay() arguments, and return a new future
180 when(mgr.scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
181 Object[] args = invocation.getArguments();
182 repeatedTasks.add(new Triple<>((Long) args[0], (Long) args[1], (StateTimerTask) args[2]));
184 CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
185 repeatedSchedules.add(sched);
189 // get/set assignments in the manager
190 AtomicReference<BucketAssignments> asgn = new AtomicReference<>(ASGN3);
192 when(mgr.getAssignments()).thenAnswer(args -> asgn.get());
195 asgn.set(args.getArgument(0));
197 }).when(mgr).startDistributing(any());
201 * Makes a sorted set of hosts.
203 * @param hosts the hosts to be sorted
204 * @return the set of hosts, sorted
206 protected SortedSet<String> sortHosts(String... hosts) {
207 return new TreeSet<>(Arrays.asList(hosts));
211 * Captures the host array from the Leader message published to the admin channel.
213 * @return the host array, as a list
215 protected List<String> captureHostList() {
216 return Arrays.asList(captureHostArray());
220 * Captures the host array from the Leader message published to the admin channel.
222 * @return the host array
224 protected String[] captureHostArray() {
225 BucketAssignments asgn = captureAssignments();
227 String[] arr = asgn.getHostArray();
234 * Captures the assignments from the Leader message published to the admin channel.
236 * @return the bucket assignments
238 protected BucketAssignments captureAssignments() {
239 Leader msg = captureAdminMessage(Leader.class);
241 BucketAssignments asgn = msg.getAssignments();
247 * Captures the message published to the admin channel.
249 * @param clazz type of {@link Message} to capture
250 * @return the message that was published
252 protected <T extends Message> T captureAdminMessage(Class<T> clazz) {
253 return captureAdminMessage(clazz, 0);
257 * Captures the message published to the admin channel.
259 * @param clazz type of {@link Message} to capture
260 * @param index index of the item to be captured
261 * @return the message that was published
263 protected <T extends Message> T captureAdminMessage(Class<T> clazz, int index) {
264 return clazz.cast(admin.get(index));
268 * Captures the message published to the non-admin channels.
270 * @param clazz type of {@link Message} to capture
271 * @return the (channel,message) pair that was published
273 protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz) {
274 return capturePublishedMessage(clazz, 0);
278 * Captures the message published to the non-admin channels.
280 * @param clazz type of {@link Message} to capture
281 * @param index index of the item to be captured
282 * @return the (channel,message) pair that was published
284 protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
285 Pair<String, Message> msg = published.get(index);
286 return Pair.of(msg.getLeft(), clazz.cast(msg.getRight()));