2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.List;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.CompletionException;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.function.BiConsumer;
31 import java.util.function.Function;
32 import lombok.AccessLevel;
35 import org.onap.policy.controlloop.ControlLoopOperation;
36 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
37 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
38 import org.onap.policy.controlloop.actorserviceprovider.Operator;
39 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
40 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
41 import org.onap.policy.controlloop.policy.PolicyResult;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Partial implementation of an operator. In general, it's preferable that subclasses
48 * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome)
49 * startOperationAsync()}. However, if that proves to be too difficult, then they can
50 * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome)
51 * doOperation()}. In addition, if the operation requires any preprocessor steps, the
52 * subclass may choose to override
53 * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}.
55 * The futures returned by the methods within this class can be canceled, and will
56 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
57 * returned by overridden methods will do the same. Of course, if a class overrides
58 * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()},
59 * then there's little that can be done to cancel that particular operation.
61 public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
63 private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
66 * Executor to be used for tasks that may perform blocking I/O. The default executor
67 * simply launches a new thread for each command that is submitted to it.
69 * May be overridden by junit tests.
71 @Getter(AccessLevel.PROTECTED)
72 @Setter(AccessLevel.PROTECTED)
73 private Executor blockingExecutor = command -> {
74 Thread thread = new Thread(command);
75 thread.setDaemon(true);
80 private final String actorName;
83 private final String name;
86 * Constructs the object.
88 * @param actorName name of the actor with which this operator is associated
89 * @param name operation name
91 public OperatorPartial(String actorName, String name) {
92 super(actorName + "." + name);
93 this.actorName = actorName;
98 * This method does nothing.
101 protected void doConfigure(Map<String, Object> parameters) {
106 * This method does nothing.
109 protected void doStart() {
114 * This method does nothing.
117 protected void doStop() {
122 * This method does nothing.
125 protected void doShutdown() {
130 public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) {
132 throw new IllegalStateException("operation is not running: " + getFullName());
135 // allocate a controller for the entire operation
136 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
138 CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params);
139 if (preproc == null) {
140 // no preprocessor required - just start the operation
141 return startOperationAttempt(params, controller, 1);
145 * Do preprocessor first and then, if successful, start the operation. Note:
146 * operations create their own outcome, ignoring the outcome from any previous
149 * Wrap the preprocessor to ensure "stop" is propagated to it.
152 controller.wrap(preproc)
153 .exceptionally(fromException(params, "preprocessor of operation"))
154 .thenCompose(handlePreprocessorFailure(params, controller))
155 .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1));
162 * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
163 * invokes the call-backs, marks the controller complete, and returns an incomplete
164 * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
167 * Assumes that no callbacks have been invoked yet.
169 * @param params operation parameters
170 * @param controller pipeline controller
171 * @return a function that checks the outcome status and continues, if successful, or
172 * indicates a failure otherwise
174 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
175 ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) {
179 if (outcome != null && isSuccess(outcome)) {
180 logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
181 return CompletableFuture.completedFuture(outcome);
184 logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
186 final Executor executor = params.getExecutor();
187 final CallbackManager callbacks = new CallbackManager();
189 // propagate "stop" to the callbacks
190 controller.add(callbacks);
192 final OperationOutcome outcome2 = params.makeOutcome();
194 // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
196 outcome2.setResult(PolicyResult.FAILURE_GUARD);
197 outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
200 CompletableFuture.completedFuture(outcome2)
201 .whenCompleteAsync(callbackStarted(params, callbacks), executor)
202 .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
203 .whenCompleteAsync(controller.delayedComplete(), executor);
206 return new CompletableFuture<>();
211 * Invokes the operation's preprocessor step(s) as a "future". This method simply
212 * returns {@code null}.
214 * This method assumes the following:
216 * <li>the operator is alive</li>
217 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
220 * @param params operation parameters
221 * @return a function that will start the preprocessor and returns its outcome, or
222 * {@code null} if this operation needs no preprocessor
224 protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
229 * Starts the operation attempt, with no preprocessor. When all retries complete, it
230 * will complete the controller.
232 * @param params operation parameters
233 * @param controller controller for all operation attempts
234 * @param attempt attempt number, typically starting with 1
235 * @return a future that will return the final result of all attempts
237 private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params,
238 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
240 // propagate "stop" to the operation attempt
241 controller.wrap(startAttemptWithoutRetries(params, attempt))
242 .thenCompose(retryOnFailure(params, controller, attempt));
248 * Starts the operation attempt, without doing any retries.
250 * @param params operation parameters
251 * @param attempt attempt number, typically starting with 1
252 * @return a future that will return the result of a single operation attempt
254 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params,
257 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
259 final Executor executor = params.getExecutor();
260 final OperationOutcome outcome = params.makeOutcome();
261 final CallbackManager callbacks = new CallbackManager();
263 // this operation attempt gets its own controller
264 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
266 // propagate "stop" to the callbacks
267 controller.add(callbacks);
270 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
271 .whenCompleteAsync(callbackStarted(params, callbacks), executor)
272 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2)));
275 // handle timeouts, if specified
276 long timeoutMillis = getTimeOutMillis(params.getTimeoutSec());
277 if (timeoutMillis > 0) {
278 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
279 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
283 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
284 * before callbackCompleted() is invoked.
286 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
287 * the pipeline as the last step anyway.
291 future.exceptionally(fromException(params, "operation"))
292 .thenApply(setRetryFlag(params, attempt))
293 .whenCompleteAsync(callbackStarted(params, callbacks), executor)
294 .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
295 .whenCompleteAsync(controller.delayedComplete(), executor);
302 * Determines if the outcome was successful.
304 * @param outcome outcome to examine
305 * @return {@code true} if the outcome was successful
307 protected boolean isSuccess(OperationOutcome outcome) {
308 return (outcome.getResult() == PolicyResult.SUCCESS);
312 * Determines if the outcome was a failure for this operator.
314 * @param outcome outcome to examine, or {@code null}
315 * @return {@code true} if the outcome is not {@code null} and was a failure
316 * <i>and</i> was associated with this operator, {@code false} otherwise
318 protected boolean isActorFailed(OperationOutcome outcome) {
319 return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
323 * Determines if the given outcome is for this operation.
325 * @param outcome outcome to examine
326 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
328 protected boolean isSameOperation(OperationOutcome outcome) {
329 return OperationOutcome.isFor(outcome, getActorName(), getName());
333 * Invokes the operation as a "future". This method simply invokes
334 * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor
335 * "blocking executor"}, returning the result via a "future".
337 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
338 * the executor in the "params", as that may bring the background thread pool to a
339 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
342 * This method assumes the following:
344 * <li>the operator is alive</li>
345 * <li>verifyRunning() has been invoked</li>
346 * <li>callbackStarted() has been invoked</li>
347 * <li>the invoker will perform appropriate timeout checks</li>
348 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
351 * @param params operation parameters
352 * @param attempt attempt number, typically starting with 1
353 * @return a function that will start the operation and return its result when
356 protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt,
357 OperationOutcome outcome) {
359 return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor());
363 * Low-level method that performs the operation. This can make the same assumptions
364 * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
365 * particular method simply throws an {@link UnsupportedOperationException}.
367 * @param params operation parameters
368 * @param attempt attempt number, typically starting with 1
369 * @param operation the operation being performed
370 * @return the outcome of the operation
372 protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) {
374 throw new UnsupportedOperationException("start operation " + getFullName());
378 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
379 * FAILURE, assuming the policy specifies retries and the retry count has been
382 * @param params operation parameters
383 * @param attempt latest attempt number, starting with 1
384 * @return a function to get the next future to execute
386 private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) {
388 return operation -> {
389 if (operation != null && !isActorFailed(operation)) {
391 * wrong type or wrong operation - just leave it as is. No need to log
392 * anything here, as retryOnFailure() will log a message
397 // get a non-null operation
398 OperationOutcome oper2;
399 if (operation != null) {
402 oper2 = params.makeOutcome();
403 oper2.setResult(PolicyResult.FAILURE);
406 Integer retry = params.getRetry();
407 if (retry != null && retry > 0 && attempt > retry) {
409 * retries were specified and we've already tried them all - change to
412 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
413 oper2.setResult(PolicyResult.FAILURE_RETRIES);
421 * Restarts the operation if it was a FAILURE. Assumes that
422 * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and
423 * thus that the "operation" is not {@code null}.
425 * @param params operation parameters
426 * @param controller controller for all of the retries
427 * @param attempt latest attempt number, starting with 1
428 * @return a function to get the next future to execute
430 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
431 ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller,
434 return operation -> {
435 if (!isActorFailed(operation)) {
436 // wrong type or wrong operation - just leave it as is
437 logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
438 controller.complete(operation);
439 return new CompletableFuture<>();
442 Integer retry = params.getRetry();
443 if (retry == null || retry <= 0) {
444 // no retries - already marked as FAILURE, so just return it
445 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
446 controller.complete(operation);
447 return new CompletableFuture<>();
452 * Retry the operation.
454 logger.info("retry operation {} for {}", getFullName(), params.getRequestId());
456 return startOperationAttempt(params, controller, attempt + 1);
461 * Converts an exception into an operation outcome, returning a copy of the outcome to
462 * prevent background jobs from changing it.
464 * @param params operation parameters
465 * @param type type of item throwing the exception
466 * @return a function that will convert an exception into an operation outcome
468 private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) {
471 OperationOutcome outcome = params.makeOutcome();
473 logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
474 params.getRequestId(), thrown);
476 return setOutcome(params, outcome, thrown);
481 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
482 * any outstanding futures when one completes.
484 * @param params operation parameters
485 * @param futures futures for which to wait
486 * @return a future to cancel or await an outcome. If this future is canceled, then
487 * all of the futures will be canceled
489 protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
490 List<CompletableFuture<OperationOutcome>> futures) {
492 // convert list to an array
493 @SuppressWarnings("rawtypes")
494 CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
496 @SuppressWarnings("unchecked")
497 CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures);
502 * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
503 * outstanding futures when one completes.
505 * @param params operation parameters
506 * @param futures futures for which to wait
507 * @return a future to cancel or await an outcome. If this future is canceled, then
508 * all of the futures will be canceled
510 protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
511 @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
513 final Executor executor = params.getExecutor();
514 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
516 attachFutures(controller, futures);
519 CompletableFuture.anyOf(futures)
520 .thenApply(object -> (OperationOutcome) object)
521 .whenCompleteAsync(controller.delayedComplete(), executor);
528 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
529 * the futures if returned future is canceled. The future returns the "worst" outcome,
530 * based on priority (see {@link #detmPriority(OperationOutcome)}).
532 * @param params operation parameters
533 * @param futures futures for which to wait
534 * @return a future to cancel or await an outcome. If this future is canceled, then
535 * all of the futures will be canceled
537 protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
538 List<CompletableFuture<OperationOutcome>> futures) {
540 // convert list to an array
541 @SuppressWarnings("rawtypes")
542 CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
544 @SuppressWarnings("unchecked")
545 CompletableFuture<OperationOutcome> result = allOf(params, arrFutures);
550 * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
551 * futures if returned future is canceled. The future returns the "worst" outcome,
552 * based on priority (see {@link #detmPriority(OperationOutcome)}).
554 * @param params operation parameters
555 * @param futures futures for which to wait
556 * @return a future to cancel or await an outcome. If this future is canceled, then
557 * all of the futures will be canceled
559 protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
560 @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
562 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
564 attachFutures(controller, futures);
566 OperationOutcome[] outcomes = new OperationOutcome[futures.length];
568 @SuppressWarnings("rawtypes")
569 CompletableFuture[] futures2 = new CompletableFuture[futures.length];
571 // record the outcomes of each future when it completes
572 for (int count = 0; count < futures2.length; ++count) {
573 final int count2 = count;
574 futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
577 CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes));
583 * Attaches the given futures to the controller.
585 * @param controller master controller for all of the futures
586 * @param futures futures to be attached to the controller
588 private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
589 @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
592 for (CompletableFuture<OperationOutcome> future : futures) {
593 controller.add(future);
598 * Combines the outcomes from a set of tasks.
600 * @param params operation parameters
601 * @param future future to be completed with the combined result
602 * @param outcomes outcomes to be examined
604 private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params,
605 CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) {
607 return (unused, thrown) -> {
608 if (thrown != null) {
609 future.completeExceptionally(thrown);
613 // identify the outcome with the highest priority
614 OperationOutcome outcome = outcomes[0];
615 int priority = detmPriority(outcome);
617 // start with "1", as we've already dealt with "0"
618 for (int count = 1; count < outcomes.length; ++count) {
619 OperationOutcome outcome2 = outcomes[count];
620 int priority2 = detmPriority(outcome2);
622 if (priority2 > priority) {
624 priority = priority2;
628 logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(),
629 (outcome == null ? null : outcome.getResult()), params.getRequestId());
631 future.complete(outcome);
636 * Determines the priority of an outcome based on its result.
638 * @param outcome outcome to examine, or {@code null}
639 * @return the outcome's priority
641 protected int detmPriority(OperationOutcome outcome) {
642 if (outcome == null) {
646 switch (outcome.getResult()) {
653 case FAILURE_RETRIES:
659 case FAILURE_TIMEOUT:
662 case FAILURE_EXCEPTION:
669 * Performs a task, after verifying that the controller is still running. Also checks
670 * that the previous outcome was successful, if specified.
672 * @param params operation parameters
673 * @param controller overall pipeline controller
674 * @param checkSuccess {@code true} to check the previous outcome, {@code false}
676 * @param outcome outcome of the previous task
677 * @param tasks tasks to be performed
678 * @return a function to perform the task. If everything checks out, then it returns
679 * the task's future. Otherwise, it returns an incomplete future and completes
680 * the controller instead.
683 protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params,
684 PipelineControllerFuture<OperationOutcome> controller,
685 boolean checkSuccess, OperationOutcome outcome,
686 CompletableFuture<OperationOutcome> task) {
689 if (checkSuccess && !isSuccess(outcome)) {
691 * must complete before canceling so that cancel() doesn't cause controller to
694 controller.complete(outcome);
696 return new CompletableFuture<>();
699 return controller.wrap(task);
703 * Performs a task, after verifying that the controller is still running. Also checks
704 * that the previous outcome was successful, if specified.
706 * @param params operation parameters
707 * @param controller overall pipeline controller
708 * @param checkSuccess {@code true} to check the previous outcome, {@code false}
710 * @param tasks tasks to be performed
711 * @return a function to perform the task. If everything checks out, then it returns
712 * the task's future. Otherwise, it returns an incomplete future and completes
713 * the controller instead.
716 protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params,
717 PipelineControllerFuture<OperationOutcome> controller,
718 boolean checkSuccess,
719 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
724 if (!controller.isRunning()) {
725 return new CompletableFuture<>();
728 if (checkSuccess && !isSuccess(outcome)) {
729 controller.complete(outcome);
730 return new CompletableFuture<>();
733 return controller.wrap(task.apply(outcome));
738 * Sets the start time of the operation and invokes the callback to indicate that the
739 * operation has started. Does nothing if the pipeline has been stopped.
741 * This assumes that the "outcome" is not {@code null}.
743 * @param params operation parameters
744 * @param callbacks used to determine if the start callback can be invoked
745 * @return a function that sets the start time and invokes the callback
747 private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params,
748 CallbackManager callbacks) {
750 return (outcome, thrown) -> {
752 if (callbacks.canStart()) {
753 // haven't invoked "start" callback yet
754 outcome.setStart(callbacks.getStartTime());
755 outcome.setEnd(null);
756 params.callbackStarted(outcome);
762 * Sets the end time of the operation and invokes the callback to indicate that the
763 * operation has completed. Does nothing if the pipeline has been stopped.
765 * This assumes that the "outcome" is not {@code null}.
767 * Note: the start time must be a reference rather than a plain value, because it's
768 * value must be gotten on-demand, when the returned function is executed at a later
771 * @param params operation parameters
772 * @param callbacks used to determine if the end callback can be invoked
773 * @return a function that sets the end time and invokes the callback
775 private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params,
776 CallbackManager callbacks) {
778 return (outcome, thrown) -> {
780 if (callbacks.canEnd()) {
781 outcome.setStart(callbacks.getStartTime());
782 outcome.setEnd(callbacks.getEndTime());
783 params.callbackCompleted(outcome);
789 * Sets an operation's outcome and message, based on a throwable.
791 * @param params operation parameters
792 * @param operation operation to be updated
793 * @return the updated operation
795 protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
797 PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
798 return setOutcome(params, operation, result);
802 * Sets an operation's outcome and default message based on the result.
804 * @param params operation parameters
805 * @param operation operation to be updated
806 * @param result result of the operation
807 * @return the updated operation
809 protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
810 PolicyResult result) {
811 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
812 operation.setResult(result);
813 operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
814 : ControlLoopOperation.FAILED_MSG);
820 * Determines if a throwable is due to a timeout.
822 * @param thrown throwable of interest
823 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
825 protected boolean isTimeout(Throwable thrown) {
826 if (thrown instanceof CompletionException) {
827 thrown = thrown.getCause();
830 return (thrown instanceof TimeoutException);
833 // these may be overridden by junit tests
836 * Gets the operation timeout. Subclasses may override this method to obtain the
837 * timeout in some other way (e.g., through configuration properties).
839 * @param timeoutSec timeout, in seconds, or {@code null}
840 * @return the operation timeout, in milliseconds
842 protected long getTimeOutMillis(Integer timeoutSec) {
843 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));