2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.eventmanager;
23 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotEquals;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32 import static org.mockito.ArgumentMatchers.any;
33 import static org.mockito.Mockito.when;
35 import java.time.Instant;
37 import java.util.TreeMap;
38 import java.util.UUID;
39 import java.util.concurrent.BlockingQueue;
40 import java.util.concurrent.CancellationException;
41 import java.util.concurrent.CompletableFuture;
42 import java.util.concurrent.ForkJoinPool;
43 import java.util.concurrent.LinkedBlockingQueue;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicReference;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.onap.policy.controlloop.ControlLoopTargetType;
51 import org.onap.policy.controlloop.VirtualControlLoopEvent;
52 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
53 import org.onap.policy.controlloop.actorserviceprovider.Operation;
54 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
56 import org.onap.policy.controlloop.actorserviceprovider.Operator;
57 import org.onap.policy.controlloop.actorserviceprovider.TargetType;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
59 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
60 import org.onap.policy.drools.domain.models.operational.OperationalTarget;
62 public class StepTest {
63 private static final UUID REQ_ID = UUID.randomUUID();
64 private static final String POLICY_ACTOR = "my-actor";
65 private static final String POLICY_OPERATION = "my-operation";
66 private static final String MY_TARGET = "my-target";
67 private static final String PAYLOAD_KEY = "payload-key";
68 private static final String PAYLOAD_VALUE = "payload-value";
69 private static final long REMAINING_MS = 5000;
70 private static final String EXPECTED_EXCEPTION = "expected exception";
73 private Operator policyOperator;
75 private Operation policyOperation;
77 private Actor policyActor;
79 private ActorService actors;
81 private CompletableFuture<OperationOutcome> future;
82 private OperationalTarget target;
83 private Map<String, String> entityIds;
84 private Map<String, String> payload;
85 private VirtualControlLoopEvent event;
86 private BlockingQueue<OperationOutcome> starts;
87 private BlockingQueue<OperationOutcome> completions;
88 private ControlLoopOperationParams params;
89 private AtomicReference<Instant> startTime;
97 MockitoAnnotations.initMocks(this);
99 future = new CompletableFuture<>();
101 // configure policy operation
102 when(actors.getActor(POLICY_ACTOR)).thenReturn(policyActor);
103 when(policyActor.getOperator(POLICY_OPERATION)).thenReturn(policyOperator);
104 when(policyOperator.buildOperation(any())).thenReturn(policyOperation);
105 when(policyOperation.start()).thenReturn(future);
107 entityIds = Map.of("entity-name-A", "entity-value-A");
109 target = OperationalTarget.builder()
110 .targetType(ControlLoopTargetType.VM)
111 .entityIds(entityIds)
114 payload = Map.of(PAYLOAD_KEY, PAYLOAD_VALUE);
116 event = new VirtualControlLoopEvent();
117 event.setRequestId(REQ_ID);
119 starts = new LinkedBlockingQueue<>();
120 completions = new LinkedBlockingQueue<>();
122 params = ControlLoopOperationParams.builder().actor(POLICY_ACTOR).actorService(actors)
123 .completeCallback(completions::add).executor(ForkJoinPool.commonPool())
124 .operation(POLICY_OPERATION).payload(new TreeMap<>(payload)).startCallback(starts::add)
125 .targetType(TargetType.valueOf(target.getTargetType())).targetEntityIds(target.getEntityIds())
126 .requestId(REQ_ID).targetEntity(MY_TARGET).build();
128 startTime = new AtomicReference<>();
130 step = new Step(params, startTime);
134 public void testConstructor() {
135 assertTrue(step.isPolicyStep());
136 assertSame(params, step.getParams());
137 assertNull(step.getParentStep());
139 // check that it recorded the startTime by starting and checking it
141 step.start(REMAINING_MS);
143 assertNotNull(startTime.get());
145 // try with null start time
146 assertThatThrownBy(() -> new Step(params, null)).isInstanceOf(NullPointerException.class)
147 .hasMessageContaining("startTime");
151 public void testConstructorWithOtherStep_testInitStartTime_testGetStartTimeRef() {
152 Step step2 = new Step(step, "actorB", "operB");
153 assertFalse(step2.isPolicyStep());
154 assertSame(step, step2.getParentStep());
156 ControlLoopOperationParams params2 = step2.getParams();
157 assertEquals("actorB", params2.getActor());
158 assertEquals("operB", params2.getOperation());
159 assertNull(params2.getRetry());
160 assertNull(params2.getTimeoutSec());
161 assertEquals(target.getTargetType().toString(), params2.getTargetType().toString());
162 assertSame(entityIds, params2.getTargetEntityIds());
163 assertEquals(MY_TARGET, params2.getTargetEntity());
164 assertTrue(params2.getPayload().isEmpty());
166 when(actors.getActor(params2.getActor())).thenReturn(policyActor);
167 when(policyActor.getOperator(params2.getOperation())).thenReturn(policyOperator);
169 assertNull(step2.getStartTime());
171 // check that it recorded the startTime by starting and checking it
173 step2.start(REMAINING_MS);
175 Instant instant = startTime.get();
176 assertNotNull(instant);
177 assertSame(instant, step2.getStartTime());
179 // launch the original step, too, so we can test the other branch of
182 step.start(REMAINING_MS);
184 assertSame(instant, startTime.get());
185 assertSame(instant, step.getStartTime());
189 public void testGetActorName_testGetOperationName() {
190 assertEquals(POLICY_ACTOR, step.getActorName());
191 assertEquals(POLICY_OPERATION, step.getOperationName());
195 public void testIsInitialized_testInit_testGetOperation() {
196 assertFalse(step.isInitialized());
198 // verify it's unchanged
199 assertFalse(step.isInitialized());
201 assertNull(step.getOperation());
205 assertSame(policyOperation, step.getOperation());
206 assertTrue(step.isInitialized());
208 // repeat - should be unchanged
210 assertSame(policyOperation, step.getOperation());
211 assertTrue(step.isInitialized());
213 // repeat without init - should be unchanged
214 assertSame(policyOperation, step.getOperation());
215 assertTrue(step.isInitialized());
219 public void testStart() {
220 assertThatIllegalStateException().isThrownBy(() -> step.start(REMAINING_MS))
221 .withMessage("step has not been initialized");
223 // initialize it, by calling getOperation(), and then try again
225 assertTrue(step.start(REMAINING_MS));
227 assertNotNull(startTime.get());
229 // should fail if we try again
230 assertThatIllegalStateException().isThrownBy(() -> step.start(REMAINING_MS))
231 .withMessage("step is already running");
235 * Tests start() when the operation.start() throws an exception.
238 public void testStartException() {
239 when(policyOperation.start()).thenThrow(new RuntimeException());
242 assertTrue(step.start(REMAINING_MS));
244 // exception should be immediate
245 OperationOutcome outcome = completions.poll();
246 assertNotNull(outcome);
248 assertNotEquals(OperationResult.SUCCESS, outcome.getResult());
249 assertEquals(POLICY_ACTOR, outcome.getActor());
250 assertTrue(outcome.isFinalOutcome());
254 * Tests start() when the operation throws an asynchronous exception.
257 public void testStartAsyncException() {
259 step.start(REMAINING_MS);
261 future.completeExceptionally(new RuntimeException(EXPECTED_EXCEPTION));
263 // exception should be immediate
264 OperationOutcome outcome = completions.poll();
265 assertNotNull(outcome);
267 assertNotEquals(OperationResult.SUCCESS, outcome.getResult());
268 assertEquals(POLICY_ACTOR, outcome.getActor());
269 assertTrue(outcome.isFinalOutcome());
273 * Tests handleException() when the exception is a CancellationException.
276 public void testHandleExceptionCancellationException() {
278 step.start(REMAINING_MS);
280 future.completeExceptionally(new CancellationException(EXPECTED_EXCEPTION));
282 // should not have generated an outcome
283 assertNull(completions.peek());
287 public void testHandleExceptionCauseCancellationException() {
289 step.start(REMAINING_MS);
291 future.completeExceptionally(new RuntimeException(EXPECTED_EXCEPTION, new CancellationException()));
293 // should not have generated an outcome
294 assertNull(completions.peek());
298 public void testHandleException() {
299 when(policyOperation.start()).thenThrow(new RuntimeException());
303 assertTrue(step.start(REMAINING_MS));
305 // exception should be immediate
306 OperationOutcome outcome = completions.poll();
307 assertNotNull(outcome);
309 assertNotEquals(OperationResult.SUCCESS, outcome.getResult());
310 assertEquals(POLICY_ACTOR, outcome.getActor());
311 assertTrue(outcome.isFinalOutcome());
312 assertEquals(POLICY_OPERATION, outcome.getOperation());
313 assertSame(startTime.get(), outcome.getStart());
314 assertNotNull(outcome.getEnd());
315 assertTrue(outcome.getEnd().getEpochSecond() >= startTime.get().getEpochSecond());
319 public void testHandleTimeout() throws InterruptedException {
322 long tstart = System.currentTimeMillis();
324 // give it a short timeout
327 OperationOutcome outcome = completions.poll(5, TimeUnit.SECONDS);
328 assertNotNull(outcome);
330 // should not have timed out before 100ms
331 assertTrue(tstart + 100 <= System.currentTimeMillis());
333 // must wait for the future to complete before checking that it was cancelled
334 assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS)).isInstanceOf(Exception.class);
336 // verify that the future was cancelled
337 assertTrue(future.isCancelled());
339 assertNotEquals(OperationResult.SUCCESS, outcome.getResult());
340 assertEquals(ActorConstants.CL_TIMEOUT_ACTOR, outcome.getActor());
341 assertTrue(outcome.isFinalOutcome());
342 assertNull(outcome.getOperation());
343 assertSame(startTime.get(), outcome.getStart());
344 assertNotNull(outcome.getEnd());
345 assertTrue(outcome.getEnd().getEpochSecond() >= startTime.get().getEpochSecond());
349 public void testCancel() {
350 // should have no effect
355 step.start(REMAINING_MS);
358 assertTrue(future.isCancelled());
362 public void testBuildOperation() {
363 assertSame(policyOperation, step.buildOperation());
367 public void testToString() {
368 assertNotNull(step.toString());