2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.time.Instant;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.CompletionException;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.concurrent.atomic.AtomicReference;
31 import java.util.function.Function;
33 import org.onap.policy.controlloop.ControlLoopOperation;
34 import org.onap.policy.controlloop.actorserviceprovider.Operator;
35 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
36 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
37 import org.onap.policy.controlloop.policy.Policy;
38 import org.onap.policy.controlloop.policy.PolicyResult;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Partial implementation of an operator. Subclasses can choose to simply implement
44 * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override
45 * {@link #doOperationAsFuture(ControlLoopOperationParams)}.
47 public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
49 private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
51 private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString();
52 private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString();
53 private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString();
56 private final String actorName;
59 private final String name;
62 * Constructs the object.
64 * @param actorName name of the actor with which this operator is associated
65 * @param name operation name
67 public OperatorPartial(String actorName, String name) {
68 super(actorName + "." + name);
69 this.actorName = actorName;
74 * This method does nothing.
77 protected void doConfigure(Map<String, Object> parameters) {
82 * This method does nothing.
85 protected void doStart() {
90 * This method does nothing.
93 protected void doStop() {
98 * This method does nothing.
101 protected void doShutdown() {
106 public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) {
108 throw new IllegalStateException("operation is not running: " + getFullName());
111 final Executor executor = params.getExecutor();
113 // allocate a controller for the entire operation
114 final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
116 CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params);
117 if (preproc == null) {
118 // no preprocessor required - just start the operation
119 return startOperationAttempt(params, controller, 1);
122 // propagate "stop" to the preprocessor
123 controller.add(preproc);
126 * Do preprocessor first and then, if successful, start the operation. Note:
127 * operations create their own outcome, ignoring the outcome from any previous
130 preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor)
131 .thenComposeAsync(handleFailure(params, controller), executor)
132 .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)),
139 * Starts an operation's preprocessor step(s). If the preprocessor fails, then it
140 * invokes the started and completed call-backs.
142 * @param params operation parameters
143 * @return a future that will return the preprocessor outcome, or {@code null} if this
144 * operation needs no preprocessor
146 protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) {
147 logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId());
149 final Executor executor = params.getExecutor();
150 final ControlLoopOperation operation = params.makeOutcome();
152 final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
153 doPreprocessorAsFuture(params);
154 if (preproc == null) {
155 // no preprocessor required
159 // allocate a controller for the preprocessor steps
160 final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
163 * Don't mark it complete until we've built the whole pipeline. This will prevent
164 * the operation from starting until after it has been successfully built (i.e.,
165 * without generating any exceptions).
167 final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
171 .thenComposeAsync(controller.add(preproc), executor)
172 .exceptionally(fromException(params, operation))
173 .whenCompleteAsync(controller.delayedComplete(), executor);
176 // start the pipeline
177 firstFuture.complete(operation);
183 * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
184 * invokes the call-backs and returns a failed outcome. Otherwise, it returns the
185 * outcome that it received.
187 * @param params operation parameters
188 * @param controller pipeline controller
189 * @return a function that checks the outcome status and continues, if successful, or
190 * indicates a failure otherwise
192 private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure(
193 ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) {
197 if (outcome != null && isSuccess(outcome)) {
198 logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
199 return CompletableFuture.completedFuture(outcome);
202 logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
204 final Executor executor = params.getExecutor();
205 final CallbackManager callbacks = new CallbackManager();
207 // propagate "stop" to the callbacks
208 controller.add(callbacks);
210 final ControlLoopOperation outcome2 = params.makeOutcome();
212 // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
214 outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString());
215 outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
217 CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor)
218 .thenApplyAsync(callbackCompleted(params, callbacks), executor)
219 .whenCompleteAsync(controller.delayedRemove(callbacks), executor)
220 .whenCompleteAsync(controller.delayedComplete(), executor);
227 * Invokes the operation's preprocessor step(s) as a "future". This method simply
228 * returns {@code null}.
230 * This method assumes the following:
232 * <li>the operator is alive</li>
233 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
236 * @param params operation parameters
237 * @return a function that will start the preprocessor and returns its outcome, or
238 * {@code null} if this operation needs no preprocessor
240 protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
241 ControlLoopOperationParams params) {
246 * Starts the operation attempt, with no preprocessor. When all retries complete, it
247 * will complete the controller.
249 * @param params operation parameters
250 * @param controller controller for all operation attempts
251 * @param attempt attempt number, typically starting with 1
252 * @return a future that will return the final result of all attempts
254 private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params,
255 PipelineControllerFuture<ControlLoopOperation> controller, int attempt) {
257 final Executor executor = params.getExecutor();
259 CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt);
261 // propagate "stop" to the operation attempt
262 controller.add(future);
264 // detach when complete
265 future.whenCompleteAsync(controller.delayedRemove(future), executor)
266 .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor())
267 .whenCompleteAsync(controller.delayedComplete(), executor);
273 * Starts the operation attempt, without doing any retries.
275 * @param params operation parameters
276 * @param attempt attempt number, typically starting with 1
277 * @return a future that will return the result of a single operation attempt
279 private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params,
282 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
284 final Executor executor = params.getExecutor();
285 final ControlLoopOperation outcome = params.makeOutcome();
286 final CallbackManager callbacks = new CallbackManager();
288 // this operation attempt gets its own controller
289 final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
291 // propagate "stop" to the callbacks
292 controller.add(callbacks);
295 * Don't mark it complete until we've built the whole pipeline. This will prevent
296 * the operation from starting until after it has been successfully built (i.e.,
297 * without generating any exceptions).
299 final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
302 CompletableFuture<ControlLoopOperation> future2 =
303 firstFuture.thenComposeAsync(verifyRunning(controller, params), executor)
304 .thenApplyAsync(callbackStarted(params, callbacks), executor)
305 .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor);
308 // handle timeouts, if specified
309 long timeoutMillis = getTimeOutMillis(params.getPolicy());
310 if (timeoutMillis > 0) {
311 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
312 future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
316 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
317 * before callbackCompleted() is invoked.
319 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
320 * the pipeline as the last step anyway.
324 future2.exceptionally(fromException(params, outcome))
325 .thenApplyAsync(setRetryFlag(params, attempt), executor)
326 .thenApplyAsync(callbackStarted(params, callbacks), executor)
327 .thenApplyAsync(callbackCompleted(params, callbacks), executor)
328 .whenCompleteAsync(controller.delayedComplete(), executor);
331 // start the pipeline
332 firstFuture.complete(outcome);
338 * Determines if the outcome was successful.
340 * @param outcome outcome to examine
341 * @return {@code true} if the outcome was successful
343 protected boolean isSuccess(ControlLoopOperation outcome) {
344 return OUTCOME_SUCCESS.equals(outcome.getOutcome());
348 * Determines if the outcome was a failure for this operator.
350 * @param outcome outcome to examine, or {@code null}
351 * @return {@code true} if the outcome is not {@code null} and was a failure
352 * <i>and</i> was associated with this operator, {@code false} otherwise
354 protected boolean isActorFailed(ControlLoopOperation outcome) {
355 return OUTCOME_FAILURE.equals(getActorOutcome(outcome));
359 * Invokes the operation as a "future". This method simply invokes
360 * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future".
362 * This method assumes the following:
364 * <li>the operator is alive</li>
365 * <li>verifyRunning() has been invoked</li>
366 * <li>callbackStarted() has been invoked</li>
367 * <li>the invoker will perform appropriate timeout checks</li>
368 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
371 * @param params operation parameters
372 * @param attempt attempt number, typically starting with 1
373 * @return a function that will start the operation and return its result when
376 protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
377 ControlLoopOperationParams params, int attempt) {
380 * TODO As doOperation() may perform blocking I/O, this should be launched in its
381 * own thread to prevent the ForkJoinPool from being tied up. Should probably
382 * provide a method to make that easy.
385 return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation),
386 params.getExecutor());
390 * Low-level method that performs the operation. This can make the same assumptions
391 * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
392 * method throws an {@link UnsupportedOperationException}.
394 * @param params operation parameters
395 * @param attempt attempt number, typically starting with 1
396 * @param operation the operation being performed
397 * @return the outcome of the operation
399 protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
400 ControlLoopOperation operation) {
402 throw new UnsupportedOperationException("start operation " + getFullName());
406 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
407 * FAILURE, assuming the policy specifies retries and the retry count has been
410 * @param params operation parameters
411 * @param attempt latest attempt number, starting with 1
412 * @return a function to get the next future to execute
414 private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params,
417 return operation -> {
418 if (operation != null && !isActorFailed(operation)) {
420 * wrong type or wrong operation - just leave it as is. No need to log
421 * anything here, as retryOnFailure() will log a message
426 // get a non-null operation
427 ControlLoopOperation oper2;
428 if (operation != null) {
431 oper2 = params.makeOutcome();
432 oper2.setOutcome(OUTCOME_FAILURE);
435 if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0
436 && attempt > params.getPolicy().getRetry()) {
438 * retries were specified and we've already tried them all - change to
441 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
442 oper2.setOutcome(OUTCOME_RETRIES);
450 * Restarts the operation if it was a FAILURE. Assumes that
451 * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and
452 * thus that the "operation" is not {@code null}.
454 * @param params operation parameters
455 * @param controller controller for all of the retries
456 * @param attempt latest attempt number, starting with 1
457 * @return a function to get the next future to execute
459 private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure(
460 ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller,
463 return operation -> {
464 if (!isActorFailed(operation)) {
465 // wrong type or wrong operation - just leave it as is
466 logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
467 return CompletableFuture.completedFuture(operation);
470 if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) {
471 // no retries - already marked as FAILURE, so just return it
472 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
473 return CompletableFuture.completedFuture(operation);
478 * Retry the operation.
480 logger.info("retry operation {} for {}", getFullName(), params.getRequestId());
482 return startOperationAttempt(params, controller, attempt + 1);
487 * Gets the outcome of an operation for this operation.
489 * @param operation operation whose outcome is to be extracted
490 * @return the outcome of the given operation, if it's for this operator, {@code null}
493 protected String getActorOutcome(ControlLoopOperation operation) {
494 if (operation == null) {
498 if (!getActorName().equals(operation.getActor())) {
502 if (!getName().equals(operation.getOperation())) {
506 return operation.getOutcome();
510 * Gets a function that will start the next step, if the current operation was
511 * successful, or just return the current operation, otherwise.
513 * @param params operation parameters
514 * @param nextStep function that will invoke the next step, passing it the operation
515 * @return a function that will start the next step
517 protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess(
518 ControlLoopOperationParams params,
519 Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) {
521 return operation -> {
523 if (operation == null) {
524 logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId());
525 ControlLoopOperation outcome = params.makeOutcome();
526 outcome.setOutcome(OUTCOME_FAILURE);
527 return CompletableFuture.completedFuture(outcome);
529 } else if (isSuccess(operation)) {
530 logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId());
531 return nextStep.apply(operation);
534 logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId());
535 return CompletableFuture.completedFuture(operation);
541 * Converts an exception into an operation outcome, returning a copy of the outcome to
542 * prevent background jobs from changing it.
544 * @param params operation parameters
545 * @param operation current operation
546 * @return a function that will convert an exception into an operation outcome
548 private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params,
549 ControlLoopOperation operation) {
552 logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(),
553 params.getRequestId(), thrown);
556 * Must make a copy of the operation, as the original could be changed by
557 * background jobs that might still be running.
559 return setOutcome(params, new ControlLoopOperation(operation), thrown);
564 * Gets a function to verify that the operation is still running. If the pipeline is
565 * not running, then it returns an incomplete future, which will effectively halt
566 * subsequent operations in the pipeline. This method is intended to be used with one
567 * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods.
569 * @param controller pipeline controller
570 * @param params operation parameters
571 * @return a function to verify that the operation is still running
573 protected <T> Function<T, CompletableFuture<T>> verifyRunning(
574 PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) {
577 boolean running = controller.isRunning();
578 logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId());
580 return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>());
585 * Sets the start time of the operation and invokes the callback to indicate that the
586 * operation has started. Does nothing if the pipeline has been stopped.
588 * This assumes that the "outcome" is not {@code null}.
590 * @param params operation parameters
591 * @param callbacks used to determine if the start callback can be invoked
592 * @return a function that sets the start time and invokes the callback
594 private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params,
595 CallbackManager callbacks) {
599 if (callbacks.canStart()) {
600 // haven't invoked "start" callback yet
601 outcome.setStart(callbacks.getStartTime());
602 outcome.setEnd(null);
603 params.callbackStarted(outcome);
611 * Sets the end time of the operation and invokes the callback to indicate that the
612 * operation has completed. Does nothing if the pipeline has been stopped.
614 * This assumes that the "outcome" is not {@code null}.
616 * Note: the start time must be a reference rather than a plain value, because it's
617 * value must be gotten on-demand, when the returned function is executed at a later
620 * @param params operation parameters
621 * @param callbacks used to determine if the end callback can be invoked
622 * @return a function that sets the end time and invokes the callback
624 private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params,
625 CallbackManager callbacks) {
627 return operation -> {
629 if (callbacks.canEnd()) {
630 operation.setStart(callbacks.getStartTime());
631 operation.setEnd(callbacks.getEndTime());
632 params.callbackCompleted(operation);
640 * Sets an operation's outcome and message, based on a throwable.
642 * @param params operation parameters
643 * @param operation operation to be updated
644 * @return the updated operation
646 protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
648 PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
649 return setOutcome(params, operation, result);
653 * Sets an operation's outcome and default message based on the result.
655 * @param params operation parameters
656 * @param operation operation to be updated
657 * @param result result of the operation
658 * @return the updated operation
660 protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
661 PolicyResult result) {
662 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
663 operation.setOutcome(result.toString());
664 operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
665 : ControlLoopOperation.FAILED_MSG);
671 * Determines if a throwable is due to a timeout.
673 * @param thrown throwable of interest
674 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
676 protected boolean isTimeout(Throwable thrown) {
677 if (thrown instanceof CompletionException) {
678 thrown = thrown.getCause();
681 return (thrown instanceof TimeoutException);
684 // these may be overridden by junit tests
687 * Gets the operation timeout. Subclasses may override this method to obtain the
688 * timeout in some other way (e.g., through configuration properties).
690 * @param policy policy from which to extract the timeout
691 * @return the operation timeout, in milliseconds
693 protected long getTimeOutMillis(Policy policy) {
694 Integer timeoutSec = policy.getTimeout();
695 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
699 * Manager for "start" and "end" callbacks.
701 private static class CallbackManager implements Runnable {
702 private final AtomicReference<Instant> startTime = new AtomicReference<>();
703 private final AtomicReference<Instant> endTime = new AtomicReference<>();
706 * Determines if the "start" callback can be invoked. If so, it sets the
707 * {@link #startTime} to the current time.
709 * @return {@code true} if the "start" callback can be invoked, {@code false}
712 public boolean canStart() {
713 return startTime.compareAndSet(null, Instant.now());
717 * Determines if the "end" callback can be invoked. If so, it sets the
718 * {@link #endTime} to the current time.
720 * @return {@code true} if the "end" callback can be invoked, {@code false}
723 public boolean canEnd() {
724 return endTime.compareAndSet(null, Instant.now());
728 * Gets the start time.
730 * @return the start time, or {@code null} if {@link #canStart()} has not been
733 public Instant getStartTime() {
734 return startTime.get();
740 * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
743 public Instant getEndTime() {
744 return endTime.get();
748 * Prevents further callbacks from being executed by setting {@link #startTime}
749 * and {@link #endTime}.