2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.eventmanager;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertFalse;
28 import static org.junit.jupiter.api.Assertions.assertNotEquals;
29 import static org.junit.jupiter.api.Assertions.assertNotNull;
30 import static org.junit.jupiter.api.Assertions.assertNull;
31 import static org.junit.jupiter.api.Assertions.assertSame;
32 import static org.junit.jupiter.api.Assertions.assertTrue;
33 import static org.mockito.ArgumentMatchers.any;
34 import static org.mockito.Mockito.mock;
35 import static org.mockito.Mockito.when;
37 import java.time.Instant;
39 import java.util.TreeMap;
40 import java.util.UUID;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.CancellationException;
43 import java.util.concurrent.CompletableFuture;
44 import java.util.concurrent.ForkJoinPool;
45 import java.util.concurrent.LinkedBlockingQueue;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.jupiter.api.BeforeEach;
49 import org.junit.jupiter.api.Test;
50 import org.onap.policy.controlloop.ControlLoopTargetType;
51 import org.onap.policy.controlloop.VirtualControlLoopEvent;
52 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
53 import org.onap.policy.controlloop.actorserviceprovider.Operation;
54 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
57 import org.onap.policy.controlloop.actorserviceprovider.Operator;
58 import org.onap.policy.controlloop.actorserviceprovider.TargetType;
59 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
60 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
61 import org.onap.policy.drools.domain.models.operational.OperationalTarget;
64 private static final UUID REQ_ID = UUID.randomUUID();
65 private static final String POLICY_ACTOR = "my-actor";
66 private static final String POLICY_OPERATION = "my-operation";
67 private static final String MY_TARGET = "my-target";
68 private static final String PAYLOAD_KEY = "payload-key";
69 private static final String PAYLOAD_VALUE = "payload-value";
70 private static final long REMAINING_MS = 5000;
71 private static final String EXPECTED_EXCEPTION = "expected exception";
73 private final Operator policyOperator = mock(Operator.class);
74 private final Operation policyOperation = mock(Operation.class);
75 private final Actor policyActor = mock(Actor.class);
76 private final ActorService actors = mock(ActorService.class);
78 private CompletableFuture<OperationOutcome> future;
79 private OperationalTarget target;
80 private Map<String, String> entityIds;
81 private Map<String, String> payload;
82 private VirtualControlLoopEvent event;
83 private BlockingQueue<OperationOutcome> starts;
84 private BlockingQueue<OperationOutcome> completions;
85 private ControlLoopOperationParams params;
86 private AtomicReference<Instant> startTime;
94 future = new CompletableFuture<>();
96 // configure policy operation
97 when(actors.getActor(POLICY_ACTOR)).thenReturn(policyActor);
98 when(policyActor.getOperator(POLICY_OPERATION)).thenReturn(policyOperator);
99 when(policyOperator.buildOperation(any())).thenReturn(policyOperation);
100 when(policyOperation.start()).thenReturn(future);
101 when(policyOperation.getProperty(OperationProperties.AAI_TARGET_ENTITY)).thenReturn(MY_TARGET);
103 entityIds = Map.of("entity-name-A", "entity-value-A");
105 target = OperationalTarget.builder()
106 .targetType(ControlLoopTargetType.VM)
107 .entityIds(entityIds)
110 payload = Map.of(PAYLOAD_KEY, PAYLOAD_VALUE);
112 event = new VirtualControlLoopEvent();
113 event.setRequestId(REQ_ID);
115 starts = new LinkedBlockingQueue<>();
116 completions = new LinkedBlockingQueue<>();
118 params = ControlLoopOperationParams.builder().actor(POLICY_ACTOR).actorService(actors)
119 .completeCallback(completions::add).executor(ForkJoinPool.commonPool())
120 .operation(POLICY_OPERATION).payload(new TreeMap<>(payload)).startCallback(starts::add)
121 .targetType(TargetType.valueOf(target.getTargetType())).targetEntityIds(target.getEntityIds())
122 .requestId(REQ_ID).build();
124 startTime = new AtomicReference<>();
126 step = new Step(params, startTime);
130 void testConstructor() {
131 assertTrue(step.isPolicyStep());
132 assertSame(params, step.getParams());
133 assertNull(step.getParentStep());
135 // check that it recorded the startTime by starting and checking it
137 step.start(REMAINING_MS);
139 assertNotNull(startTime.get());
141 // try with null start time
142 assertThatThrownBy(() -> new Step(params, null)).isInstanceOf(NullPointerException.class)
143 .hasMessageContaining("startTime");
147 void testConstructorWithOtherStep_testInitStartTime_testGetStartTimeRef() {
148 var step2 = new Step(step, "actorB", "operB");
149 assertFalse(step2.isPolicyStep());
150 assertSame(step, step2.getParentStep());
152 var params2 = step2.getParams();
153 assertEquals("actorB", params2.getActor());
154 assertEquals("operB", params2.getOperation());
155 assertNull(params2.getRetry());
156 assertNull(params2.getTimeoutSec());
157 assertEquals(target.getTargetType().toString(), params2.getTargetType().toString());
158 assertSame(entityIds, params2.getTargetEntityIds());
159 assertTrue(params2.getPayload().isEmpty());
161 when(actors.getActor(params2.getActor())).thenReturn(policyActor);
162 when(policyActor.getOperator(params2.getOperation())).thenReturn(policyOperator);
164 assertNull(step2.getStartTime());
166 // check that it recorded the startTime by starting and checking it
168 step2.start(REMAINING_MS);
170 var instant = startTime.get();
171 assertNotNull(instant);
172 assertSame(instant, step2.getStartTime());
174 // launch the original step, too, so we can test the other branch of
177 step.start(REMAINING_MS);
179 assertSame(instant, startTime.get());
180 assertSame(instant, step.getStartTime());
184 void testGetActorName_testGetOperationName() {
185 assertEquals(POLICY_ACTOR, step.getActorName());
186 assertEquals(POLICY_OPERATION, step.getOperationName());
190 void testIsInitialized_testInit_testGetOperation() {
191 assertFalse(step.isInitialized());
193 // verify it's unchanged
194 assertFalse(step.isInitialized());
196 assertNull(step.getOperation());
200 assertSame(policyOperation, step.getOperation());
201 assertTrue(step.isInitialized());
203 // repeat - should be unchanged
205 assertSame(policyOperation, step.getOperation());
206 assertTrue(step.isInitialized());
208 // repeat without init - should be unchanged
209 assertSame(policyOperation, step.getOperation());
210 assertTrue(step.isInitialized());
215 assertThatIllegalStateException().isThrownBy(() -> step.start(REMAINING_MS))
216 .withMessage("step has not been initialized");
218 // initialize it, by calling getOperation(), and then try again
220 assertTrue(step.start(REMAINING_MS));
222 assertNotNull(startTime.get());
224 // should fail if we try again
225 assertThatIllegalStateException().isThrownBy(() -> step.start(REMAINING_MS))
226 .withMessage("step is already running");
230 * Tests start() when the operation.start() throws an exception.
233 void testStartException() {
234 when(policyOperation.start()).thenThrow(new RuntimeException());
237 assertTrue(step.start(REMAINING_MS));
239 // exception should be immediate
240 var outcome = completions.poll();
241 assertNotNull(outcome);
243 assertNotEquals(OperationResult.SUCCESS, outcome.getResult());
244 assertEquals(POLICY_ACTOR, outcome.getActor());
245 assertTrue(outcome.isFinalOutcome());
249 * Tests start() when the operation throws an asynchronous exception.
252 void testStartAsyncException() {
254 step.start(REMAINING_MS);
256 future.completeExceptionally(new RuntimeException(EXPECTED_EXCEPTION));
258 // exception should be immediate
259 var outcome = completions.poll();
260 assertNotNull(outcome);
262 assertNotEquals(OperationResult.SUCCESS, outcome.getResult());
263 assertEquals(POLICY_ACTOR, outcome.getActor());
264 assertTrue(outcome.isFinalOutcome());
268 * Tests handleException() when the exception is a CancellationException.
271 void testHandleExceptionCancellationException() {
273 step.start(REMAINING_MS);
275 future.completeExceptionally(new CancellationException(EXPECTED_EXCEPTION));
277 // should not have generated an outcome
278 assertNull(completions.peek());
282 void testHandleExceptionCauseCancellationException() {
284 step.start(REMAINING_MS);
286 future.completeExceptionally(new RuntimeException(EXPECTED_EXCEPTION, new CancellationException()));
288 // should not have generated an outcome
289 assertNull(completions.peek());
293 void testHandleException() {
294 when(policyOperation.start()).thenThrow(new RuntimeException());
298 assertTrue(step.start(REMAINING_MS));
300 // exception should be immediate
301 var outcome = completions.poll();
302 assertNotNull(outcome);
304 assertNotEquals(OperationResult.SUCCESS, outcome.getResult());
305 assertEquals(POLICY_ACTOR, outcome.getActor());
306 assertTrue(outcome.isFinalOutcome());
307 assertEquals(POLICY_OPERATION, outcome.getOperation());
308 assertSame(startTime.get(), outcome.getStart());
309 assertNotNull(outcome.getEnd());
310 assertTrue(outcome.getEnd().getEpochSecond() >= startTime.get().getEpochSecond());
314 void testHandleTimeout() throws InterruptedException {
317 long tstart = System.currentTimeMillis();
319 // give it a short timeout
322 var outcome = completions.poll(5, TimeUnit.SECONDS);
323 assertNotNull(outcome);
325 // should not have timed out before 100ms
326 assertTrue(tstart + 100 <= System.currentTimeMillis());
328 // must wait for the future to complete before checking that it was cancelled
329 assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS)).isInstanceOf(Exception.class);
331 // verify that the future was cancelled
332 assertTrue(future.isCancelled());
334 assertNotEquals(OperationResult.SUCCESS, outcome.getResult());
335 assertEquals(ActorConstants.CL_TIMEOUT_ACTOR, outcome.getActor());
336 assertTrue(outcome.isFinalOutcome());
337 assertNull(outcome.getOperation());
338 assertSame(startTime.get(), outcome.getStart());
339 assertNotNull(outcome.getEnd());
340 assertTrue(outcome.getEnd().getEpochSecond() >= startTime.get().getEpochSecond());
345 // should have no effect
350 step.start(REMAINING_MS);
353 assertTrue(future.isCancelled());
357 void testBuildOperation() {
358 assertSame(policyOperation, step.buildOperation());
362 void testMakeOutcome() {
364 assertEquals(MY_TARGET, step.makeOutcome().getTarget());
368 void testToString() {
369 assertNotNull(step.toString());