1a65c80271644a031b2bcedc073a7f7b30f88129
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2020 Nordix Foundation
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling.state;
23
24 import static org.junit.Assert.assertNotNull;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.ArgumentMatchers.anyLong;
27 import static org.mockito.ArgumentMatchers.anyString;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
31
32 import java.util.Arrays;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.SortedSet;
37 import java.util.TreeSet;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.apache.commons.lang3.tuple.Pair;
40 import org.apache.commons.lang3.tuple.Triple;
41 import org.onap.policy.drools.pooling.CancellableScheduledTask;
42 import org.onap.policy.drools.pooling.PoolingManager;
43 import org.onap.policy.drools.pooling.PoolingProperties;
44 import org.onap.policy.drools.pooling.message.BucketAssignments;
45 import org.onap.policy.drools.pooling.message.Leader;
46 import org.onap.policy.drools.pooling.message.Message;
47
48 /**
49  * Superclass used to test subclasses of {@link State}.
50  */
51 public class SupportBasicStateTester {
52
53     protected static final long STD_HEARTBEAT_WAIT_MS = 10;
54     protected static final long STD_REACTIVATE_WAIT_MS = STD_HEARTBEAT_WAIT_MS + 1;
55     protected static final long STD_IDENTIFICATION_MS = STD_REACTIVATE_WAIT_MS + 1;
56     protected static final long STD_ACTIVE_HEARTBEAT_MS = STD_IDENTIFICATION_MS + 1;
57     protected static final long STD_INTER_HEARTBEAT_MS = STD_ACTIVE_HEARTBEAT_MS + 1;
58
59     protected static final String MY_TOPIC = "myTopic";
60
61     protected static final String PREV_HOST = "prevHost";
62     protected static final String PREV_HOST2 = PREV_HOST + "A";
63
64     // this follows PREV_HOST, alphabetically
65     protected static final String MY_HOST = PREV_HOST + "X";
66
67     // these follow MY_HOST, alphabetically
68     protected static final String HOST1 = MY_HOST + "1";
69     protected static final String HOST2 = MY_HOST + "2";
70     protected static final String HOST3 = MY_HOST + "3";
71     protected static final String HOST4 = MY_HOST + "4";
72
73     protected static final String LEADER = HOST1;
74
75     protected static final String[] HOST_ARR3 = {HOST1, MY_HOST, HOST2};
76
77     protected static final BucketAssignments EMPTY_ASGN = new BucketAssignments();
78     protected static final BucketAssignments ASGN3 = new BucketAssignments(HOST_ARR3);
79
80     /**
81      * Scheduled tasks returned by schedule().
82      */
83     protected LinkedList<CancellableScheduledTask> onceSchedules;
84
85     /**
86      * Tasks captured via schedule().
87      */
88     protected LinkedList<Pair<Long, StateTimerTask>> onceTasks;
89
90     /**
91      * Scheduled tasks returned by scheduleWithFixedDelay().
92      */
93     protected LinkedList<CancellableScheduledTask> repeatedSchedules;
94
95     /**
96      * Tasks captured via scheduleWithFixedDelay().
97      */
98     protected LinkedList<Triple<Long, Long, StateTimerTask>> repeatedTasks;
99
100     /**
101      * Messages captured via publish().
102      */
103     protected LinkedList<Pair<String, Message>> published;
104
105     /**
106      * Messages captured via publishAdmin().
107      */
108     protected LinkedList<Message> admin;
109
110     protected PoolingManager mgr;
111     protected PoolingProperties props;
112     protected State prevState;
113
114     public SupportBasicStateTester() {
115         super();
116     }
117
118     /**
119      * Setup.
120      *
121      * @throws Exception throws exception
122      */
123     public void setUp() throws Exception {
124         onceSchedules = new LinkedList<>();
125         onceTasks = new LinkedList<>();
126
127         repeatedSchedules = new LinkedList<>();
128         repeatedTasks = new LinkedList<>();
129
130         published = new LinkedList<>();
131         admin = new LinkedList<>();
132
133         mgr = mock(PoolingManager.class);
134         props = mock(PoolingProperties.class);
135
136         when(mgr.getHost()).thenReturn(MY_HOST);
137         when(mgr.getTopic()).thenReturn(MY_TOPIC);
138         when(mgr.getProperties()).thenReturn(props);
139
140         when(props.getStartHeartbeatMs()).thenReturn(STD_HEARTBEAT_WAIT_MS);
141         when(props.getReactivateMs()).thenReturn(STD_REACTIVATE_WAIT_MS);
142         when(props.getIdentificationMs()).thenReturn(STD_IDENTIFICATION_MS);
143         when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
144         when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
145
146         prevState = new State(mgr) {
147             @Override
148             public Map<String, Object> getFilter() {
149                 throw new UnsupportedOperationException("cannot filter");
150             }
151         };
152
153         // capture publish() arguments
154         doAnswer(invocation -> {
155             Object[] args = invocation.getArguments();
156             published.add(Pair.of((String) args[0], (Message) args[1]));
157
158             return null;
159         }).when(mgr).publish(anyString(), any(Message.class));
160
161         // capture publishAdmin() arguments
162         doAnswer(invocation -> {
163             Object[] args = invocation.getArguments();
164             admin.add((Message) args[0]);
165
166             return null;
167         }).when(mgr).publishAdmin(any(Message.class));
168
169         // capture schedule() arguments, and return a new future
170         when(mgr.schedule(anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
171             Object[] args = invocation.getArguments();
172             onceTasks.add(Pair.of((Long) args[0], (StateTimerTask) args[1]));
173
174             CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
175             onceSchedules.add(sched);
176             return sched;
177         });
178
179         // capture scheduleWithFixedDelay() arguments, and return a new future
180         when(mgr.scheduleWithFixedDelay(anyLong(), anyLong(), any(StateTimerTask.class))).thenAnswer(invocation -> {
181             Object[] args = invocation.getArguments();
182             repeatedTasks.add(Triple.of((Long) args[0], (Long) args[1], (StateTimerTask) args[2]));
183
184             CancellableScheduledTask sched = mock(CancellableScheduledTask.class);
185             repeatedSchedules.add(sched);
186             return sched;
187         });
188
189         // get/set assignments in the manager
190         AtomicReference<BucketAssignments> asgn = new AtomicReference<>(ASGN3);
191
192         when(mgr.getAssignments()).thenAnswer(args -> asgn.get());
193
194         doAnswer(args -> {
195             asgn.set(args.getArgument(0));
196             return null;
197         }).when(mgr).startDistributing(any());
198     }
199
200     /**
201      * Makes a sorted set of hosts.
202      *
203      * @param hosts the hosts to be sorted
204      * @return the set of hosts, sorted
205      */
206     protected SortedSet<String> sortHosts(String... hosts) {
207         return new TreeSet<>(Arrays.asList(hosts));
208     }
209
210     /**
211      * Captures the host array from the Leader message published to the admin channel.
212      *
213      * @return the host array, as a list
214      */
215     protected List<String> captureHostList() {
216         return Arrays.asList(captureHostArray());
217     }
218
219     /**
220      * Captures the host array from the Leader message published to the admin channel.
221      *
222      * @return the host array
223      */
224     protected String[] captureHostArray() {
225         BucketAssignments asgn = captureAssignments();
226
227         String[] arr = asgn.getHostArray();
228         assertNotNull(arr);
229
230         return arr;
231     }
232
233     /**
234      * Captures the assignments from the Leader message published to the admin channel.
235      *
236      * @return the bucket assignments
237      */
238     protected BucketAssignments captureAssignments() {
239         Leader msg = captureAdminMessage(Leader.class);
240
241         BucketAssignments asgn = msg.getAssignments();
242         assertNotNull(asgn);
243         return asgn;
244     }
245
246     /**
247      * Captures the message published to the admin channel.
248      *
249      * @param clazz type of {@link Message} to capture
250      * @return the message that was published
251      */
252     protected <T extends Message> T captureAdminMessage(Class<T> clazz) {
253         return captureAdminMessage(clazz, 0);
254     }
255
256     /**
257      * Captures the message published to the admin channel.
258      *
259      * @param clazz type of {@link Message} to capture
260      * @param index index of the item to be captured
261      * @return the message that was published
262      */
263     protected <T extends Message> T captureAdminMessage(Class<T> clazz, int index) {
264         return clazz.cast(admin.get(index));
265     }
266
267     /**
268      * Captures the message published to the non-admin channels.
269      *
270      * @param clazz type of {@link Message} to capture
271      * @return the (channel,message) pair that was published
272      */
273     protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz) {
274         return capturePublishedMessage(clazz, 0);
275     }
276
277     /**
278      * Captures the message published to the non-admin channels.
279      *
280      * @param clazz type of {@link Message} to capture
281      * @param index index of the item to be captured
282      * @return the (channel,message) pair that was published
283      */
284     protected <T extends Message> Pair<String, T> capturePublishedMessage(Class<T> clazz, int index) {
285         Pair<String, Message> msg = published.get(index);
286         return Pair.of(msg.getLeft(), clazz.cast(msg.getRight()));
287     }
288 }