2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2020, 2024 Nordix Foundation
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling.state;
24 import static org.junit.jupiter.api.Assertions.assertNotNull;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyLong;
27 import static org.mockito.ArgumentMatchers.anyString;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
32 import java.util.Arrays;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.SortedSet;
36 import java.util.TreeSet;
37 import java.util.concurrent.atomic.AtomicReference;
38 import org.apache.commons.lang3.tuple.Pair;
39 import org.apache.commons.lang3.tuple.Triple;
40 import org.onap.policy.drools.pooling.CancellableScheduledTask;
41 import org.onap.policy.drools.pooling.PoolingManager;
42 import org.onap.policy.drools.pooling.PoolingProperties;
43 import org.onap.policy.drools.pooling.message.BucketAssignments;
44 import org.onap.policy.drools.pooling.message.Leader;
45 import org.onap.policy.drools.pooling.message.Message;
48 * Superclass used to test subclasses of {@link State}.
50 public class SupportBasicStateTester {
52 protected static final long STD_HEARTBEAT_WAIT_MS = 10;
53 protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
54 protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
55 protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
56 protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
58 protected static final String MY_TOPIC = "myTopic";
60 protected static final String PREV_HOST = "prevHost";
61 protected static final String PREV_HOST2 = PREV_HOST + "A";
63 // this follows PREV_HOST, alphabetically
64 protected static final String MY_HOST = PREV_HOST + "X";
66 // these follow MY_HOST, alphabetically
67 protected static final String HOST1 = MY_HOST + "1";
68 protected static final String HOST2 = MY_HOST + "2";
69 protected static final String HOST3 = MY_HOST + "3";
70 protected static final String HOST4 = MY_HOST + "4";
72 protected static final String LEADER = HOST1;
74 protected static final String[] HOST_ARR3 = {HOST1, MY_HOST, HOST2};
76 protected static final BucketAssignments EMPTY_ASGN = new BucketAssignments();
77 protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3);
80 * Scheduled tasks returned by schedule().
82 protected LinkedList<CancellableScheduledTask> onceSchedules;
85 * Tasks captured via schedule().
87 protected LinkedList<Pair<Long, StateTimerTask>> onceTasks;
90 * Scheduled tasks returned by scheduleWithFixedDelay().
92 protected LinkedList<CancellableScheduledTask> repeatedSchedules;
95 * Tasks captured via scheduleWithFixedDelay().
97 protected LinkedList<Triple<Long, Long, StateTimerTask>> repeatedTasks;
100 * Messages captured via publish().
102 protected LinkedList<Pair<String, Message>> published;
105 * Messages captured via publishAdmin().
107 protected LinkedList<Message> admin;
109 protected PoolingManager mgr;
110 protected PoolingProperties props;
111 protected State prevState;
113 public SupportBasicStateTester() {
120 * @throws Exception throws exception
122 public void setUp() throws Exception {
123 onceSchedules = new LinkedList<>();
124 onceTasks = new LinkedList<>();
126 repeatedSchedules = new LinkedList<>();
127 repeatedTasks = new LinkedList<>();
129 published = new LinkedList<>();
130 admin = new LinkedList<>();
132 mgr = mock(PoolingManager.class);
133 props = mock(PoolingProperties.class);
135 when(mgr.getHost()).thenReturn(MY_HOST);
136 when(mgr.getTopic()).thenReturn(MY_TOPIC);
137 when(mgr.getProperties()).thenReturn(props);
139 when(props.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
140 when(props.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
141 when(props.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
142 when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
143 when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
145 prevState = new State(mgr) {};
147 // capture publish() arguments
148 doAnswer(invocation -> {
149 Object[] args = invocation.getArguments();
150 published.add(Pair.of((String) args[0], (Message) args[1]));
153 }).when(mgr).publish(anyString(), any(Message.class));
155 // capture publishAdmin() arguments
156 doAnswer(invocation -> {
157 Object[] args = invocation.getArguments();
158 admin.add((Message) args[0]);
161 }).when(mgr).publishAdmin(any(Message.class));
163 // capture schedule() arguments, and return a new future
164 when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
165 Object[] args = invocation.getArguments();
166 onceTasks.add(Pair.of((Long) args[0], (StateTimerTask) args[1]));
168 CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
169 onceSchedules.add(sched);
173 // capture scheduleWithFixedDelay() arguments, and return a new future
174 when(mgr.scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
175 Object[] args = invocation.getArguments();
176 repeatedTasks.add(Triple.of((Long) args[0], (Long) args[1], (StateTimerTask) args[2]));
178 CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
179 repeatedSchedules.add(sched);
183 // get/set assignments in the manager
184 AtomicReference<BucketAssignments> asgn = new AtomicReference<>(ASGN3);
186 when(mgr.getAssignments()).thenAnswer(args -> asgn.get());
189 asgn.set(args.getArgument(0));
191 }).when(mgr).startDistributing(any());
195 * Makes a sorted set of hosts.
197 * @param hosts the hosts to be sorted
198 * @return the set of hosts, sorted
200 protected SortedSet<String> sortHosts(String... hosts) {
201 return new TreeSet<>(Arrays.asList(hosts));
205 * Captures the host array from the Leader message published to the admin channel.
207 * @return the host array, as a list
209 protected List<String> captureHostList() {
210 return Arrays.asList(captureHostArray());
214 * Captures the host array from the Leader message published to the admin channel.
216 * @return the host array
218 protected String[] captureHostArray() {
219 BucketAssignments asgn = captureAssignments();
221 String[] arr = asgn.getHostArray();
228 * Captures the assignments from the Leader message published to the admin channel.
230 * @return the bucket assignments
232 protected BucketAssignments captureAssignments() {
233 Leader msg = captureAdminMessage(Leader.class);
235 BucketAssignments asgn = msg.getAssignments();
241 * Captures the message published to the admin channel.
243 * @param clazz type of {@link Message} to capture
244 * @return the message that was published
246 protected <T extends Message> T captureAdminMessage(Class<T> clazz) {
247 return captureAdminMessage(clazz, 0);
251 * Captures the message published to the admin channel.
253 * @param clazz type of {@link Message} to capture
254 * @param index index of the item to be captured
255 * @return the message that was published
257 protected <T extends Message> T captureAdminMessage(Class<T> clazz, int index) {
258 return clazz.cast(admin.get(index));
262 * Captures the message published to the non-admin channels.
264 * @param clazz type of {@link Message} to capture
265 * @return the (channel,message) pair that was published
267 protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz) {
268 return capturePublishedMessage(clazz, 0);
272 * Captures the message published to the non-admin channels.
274 * @param clazz type of {@link Message} to capture
275 * @param index index of the item to be captured
276 * @return the (channel,message) pair that was published
278 protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
279 Pair<String, Message> msg = published.get(index);
280 return Pair.of(msg.getLeft(), clazz.cast(msg.getRight()));