Add more code to facilitate actor implementation
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.CompletionException;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import java.util.function.BiConsumer;
30 import java.util.function.Function;
31 import org.onap.policy.controlloop.ControlLoopOperation;
32 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
33 import org.onap.policy.controlloop.actorserviceprovider.Operation;
34 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
35 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
36 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
37 import org.onap.policy.controlloop.policy.PolicyResult;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Partial implementation of an operator. In general, it's preferable that subclasses
43  * would override {@link #startOperationAsync(int, OperationOutcome)
44  * startOperationAsync()}. However, if that proves to be too difficult, then they can
45  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
46  * if the operation requires any preprocessor steps, the subclass may choose to override
47  * {@link #startPreprocessorAsync()}.
48  * <p/>
49  * The futures returned by the methods within this class can be canceled, and will
50  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
51  * returned by overridden methods will do the same. Of course, if a class overrides
52  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
53  * be done to cancel that particular operation.
54  */
55 public abstract class OperationPartial implements Operation {
56
57     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
58     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
59
60     // values extracted from the operator
61
62     private final OperatorPartial operator;
63
64     /**
65      * Operation parameters.
66      */
67     protected final ControlLoopOperationParams params;
68
69
70     /**
71      * Constructs the object.
72      *
73      * @param params operation parameters
74      * @param operator operator that created this operation
75      */
76     public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) {
77         this.params = params;
78         this.operator = operator;
79     }
80
81     public Executor getBlockingExecutor() {
82         return operator.getBlockingExecutor();
83     }
84
85     public String getFullName() {
86         return operator.getFullName();
87     }
88
89     public String getActorName() {
90         return operator.getActorName();
91     }
92
93     public String getName() {
94         return operator.getName();
95     }
96
97     @Override
98     public final CompletableFuture<OperationOutcome> start() {
99         if (!operator.isAlive()) {
100             throw new IllegalStateException("operation is not running: " + getFullName());
101         }
102
103         // allocate a controller for the entire operation
104         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
105
106         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
107         if (preproc == null) {
108             // no preprocessor required - just start the operation
109             return startOperationAttempt(controller, 1);
110         }
111
112         /*
113          * Do preprocessor first and then, if successful, start the operation. Note:
114          * operations create their own outcome, ignoring the outcome from any previous
115          * steps.
116          *
117          * Wrap the preprocessor to ensure "stop" is propagated to it.
118          */
119         // @formatter:off
120         controller.wrap(preproc)
121                         .exceptionally(fromException("preprocessor of operation"))
122                         .thenCompose(handlePreprocessorFailure(controller))
123                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
124                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
125         // @formatter:on
126
127         return controller;
128     }
129
130     /**
131      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
132      * invokes the call-backs, marks the controller complete, and returns an incomplete
133      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
134      * received.
135      * <p/>
136      * Assumes that no callbacks have been invoked yet.
137      *
138      * @param controller pipeline controller
139      * @return a function that checks the outcome status and continues, if successful, or
140      *         indicates a failure otherwise
141      */
142     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
143                     PipelineControllerFuture<OperationOutcome> controller) {
144
145         return outcome -> {
146
147             if (outcome != null && isSuccess(outcome)) {
148                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
149                 return CompletableFuture.completedFuture(outcome);
150             }
151
152             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
153
154             final Executor executor = params.getExecutor();
155             final CallbackManager callbacks = new CallbackManager();
156
157             // propagate "stop" to the callbacks
158             controller.add(callbacks);
159
160             final OperationOutcome outcome2 = params.makeOutcome();
161
162             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
163
164             outcome2.setResult(PolicyResult.FAILURE_GUARD);
165             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
166
167             // @formatter:off
168             CompletableFuture.completedFuture(outcome2)
169                             .whenCompleteAsync(callbackStarted(callbacks), executor)
170                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
171                             .whenCompleteAsync(controller.delayedComplete(), executor);
172             // @formatter:on
173
174             return new CompletableFuture<>();
175         };
176     }
177
178     /**
179      * Invokes the operation's preprocessor step(s) as a "future". This method simply
180      * invokes {@link #startGuardAsync()}.
181      * <p/>
182      * This method assumes the following:
183      * <ul>
184      * <li>the operator is alive</li>
185      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
186      * </ul>
187      *
188      * @return a function that will start the preprocessor and returns its outcome, or
189      *         {@code null} if this operation needs no preprocessor
190      */
191     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
192         return startGuardAsync();
193     }
194
195     /**
196      * Invokes the operation's guard step(s) as a "future". This method simply returns
197      * {@code null}.
198      * <p/>
199      * This method assumes the following:
200      * <ul>
201      * <li>the operator is alive</li>
202      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
203      * </ul>
204      *
205      * @return a function that will start the guard checks and returns its outcome, or
206      *         {@code null} if this operation has no guard
207      */
208     protected CompletableFuture<OperationOutcome> startGuardAsync() {
209         return null;
210     }
211
212     /**
213      * Starts the operation attempt, with no preprocessor. When all retries complete, it
214      * will complete the controller.
215      *
216      * @param controller controller for all operation attempts
217      * @param attempt attempt number, typically starting with 1
218      * @return a future that will return the final result of all attempts
219      */
220     private CompletableFuture<OperationOutcome> startOperationAttempt(
221                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
222
223         // propagate "stop" to the operation attempt
224         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
225                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
226
227         return controller;
228     }
229
230     /**
231      * Starts the operation attempt, without doing any retries.
232      *
233      * @param params operation parameters
234      * @param attempt attempt number, typically starting with 1
235      * @return a future that will return the result of a single operation attempt
236      */
237     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
238
239         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
240
241         final Executor executor = params.getExecutor();
242         final OperationOutcome outcome = params.makeOutcome();
243         final CallbackManager callbacks = new CallbackManager();
244
245         // this operation attempt gets its own controller
246         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
247
248         // propagate "stop" to the callbacks
249         controller.add(callbacks);
250
251         // @formatter:off
252         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
253                         .whenCompleteAsync(callbackStarted(callbacks), executor)
254                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
255         // @formatter:on
256
257         // handle timeouts, if specified
258         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
259         if (timeoutMillis > 0) {
260             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
261             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
262         }
263
264         /*
265          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
266          * before callbackCompleted() is invoked.
267          *
268          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
269          * the pipeline as the last step anyway.
270          */
271
272         // @formatter:off
273         future.exceptionally(fromException("operation"))
274                     .thenApply(setRetryFlag(attempt))
275                     .whenCompleteAsync(callbackStarted(callbacks), executor)
276                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
277                     .whenCompleteAsync(controller.delayedComplete(), executor);
278         // @formatter:on
279
280         return controller;
281     }
282
283     /**
284      * Determines if the outcome was successful.
285      *
286      * @param outcome outcome to examine
287      * @return {@code true} if the outcome was successful
288      */
289     protected boolean isSuccess(OperationOutcome outcome) {
290         return (outcome.getResult() == PolicyResult.SUCCESS);
291     }
292
293     /**
294      * Determines if the outcome was a failure for this operator.
295      *
296      * @param outcome outcome to examine, or {@code null}
297      * @return {@code true} if the outcome is not {@code null} and was a failure
298      *         <i>and</i> was associated with this operator, {@code false} otherwise
299      */
300     protected boolean isActorFailed(OperationOutcome outcome) {
301         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
302     }
303
304     /**
305      * Determines if the given outcome is for this operation.
306      *
307      * @param outcome outcome to examine
308      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
309      */
310     protected boolean isSameOperation(OperationOutcome outcome) {
311         return OperationOutcome.isFor(outcome, getActorName(), getName());
312     }
313
314     /**
315      * Invokes the operation as a "future". This method simply invokes
316      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
317      * returning the result via a "future".
318      * <p/>
319      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
320      * the executor in the "params", as that may bring the background thread pool to a
321      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
322      * instead.
323      * <p/>
324      * This method assumes the following:
325      * <ul>
326      * <li>the operator is alive</li>
327      * <li>verifyRunning() has been invoked</li>
328      * <li>callbackStarted() has been invoked</li>
329      * <li>the invoker will perform appropriate timeout checks</li>
330      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
331      * </ul>
332      *
333      * @param attempt attempt number, typically starting with 1
334      * @return a function that will start the operation and return its result when
335      *         complete
336      */
337     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
338
339         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
340     }
341
342     /**
343      * Low-level method that performs the operation. This can make the same assumptions
344      * that are made by {@link #doOperationAsFuture()}. This particular method simply
345      * throws an {@link UnsupportedOperationException}.
346      *
347      * @param attempt attempt number, typically starting with 1
348      * @param operation the operation being performed
349      * @return the outcome of the operation
350      */
351     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
352
353         throw new UnsupportedOperationException("start operation " + getFullName());
354     }
355
356     /**
357      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
358      * FAILURE, assuming the policy specifies retries and the retry count has been
359      * exhausted.
360      *
361      * @param attempt latest attempt number, starting with 1
362      * @return a function to get the next future to execute
363      */
364     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
365
366         return operation -> {
367             if (operation != null && !isActorFailed(operation)) {
368                 /*
369                  * wrong type or wrong operation - just leave it as is. No need to log
370                  * anything here, as retryOnFailure() will log a message
371                  */
372                 return operation;
373             }
374
375             // get a non-null operation
376             OperationOutcome oper2;
377             if (operation != null) {
378                 oper2 = operation;
379             } else {
380                 oper2 = params.makeOutcome();
381                 oper2.setResult(PolicyResult.FAILURE);
382             }
383
384             int retry = getRetry(params.getRetry());
385             if (retry > 0 && attempt > retry) {
386                 /*
387                  * retries were specified and we've already tried them all - change to
388                  * FAILURE_RETRIES
389                  */
390                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
391                 oper2.setResult(PolicyResult.FAILURE_RETRIES);
392             }
393
394             return oper2;
395         };
396     }
397
398     /**
399      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
400      * was previously invoked, and thus that the "operation" is not {@code null}.
401      *
402      * @param controller controller for all of the retries
403      * @param attempt latest attempt number, starting with 1
404      * @return a function to get the next future to execute
405      */
406     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
407                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
408
409         return operation -> {
410             if (!isActorFailed(operation)) {
411                 // wrong type or wrong operation - just leave it as is
412                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
413                 controller.complete(operation);
414                 return new CompletableFuture<>();
415             }
416
417             if (getRetry(params.getRetry()) <= 0) {
418                 // no retries - already marked as FAILURE, so just return it
419                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
420                 controller.complete(operation);
421                 return new CompletableFuture<>();
422             }
423
424             /*
425              * Retry the operation.
426              */
427             long waitMs = getRetryWaitMs();
428             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
429
430             return sleep(waitMs, TimeUnit.MILLISECONDS)
431                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
432         };
433     }
434
435     /**
436      * Convenience method that starts a sleep(), running via a future.
437      *
438      * @param sleepTime time to sleep
439      * @param unit time unit
440      * @return a future that will complete when the sleep completes
441      */
442     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
443         if (sleepTime <= 0) {
444             return CompletableFuture.completedFuture(null);
445         }
446
447         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
448     }
449
450     /**
451      * Converts an exception into an operation outcome, returning a copy of the outcome to
452      * prevent background jobs from changing it.
453      *
454      * @param type type of item throwing the exception
455      * @return a function that will convert an exception into an operation outcome
456      */
457     private Function<Throwable, OperationOutcome> fromException(String type) {
458
459         return thrown -> {
460             OperationOutcome outcome = params.makeOutcome();
461
462             logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
463                             params.getRequestId(), thrown);
464
465             return setOutcome(outcome, thrown);
466         };
467     }
468
469     /**
470      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
471      * any outstanding futures when one completes.
472      *
473      * @param futures futures for which to wait
474      * @return a future to cancel or await an outcome. If this future is canceled, then
475      *         all of the futures will be canceled
476      */
477     protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) {
478
479         // convert list to an array
480         @SuppressWarnings("rawtypes")
481         CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
482
483         @SuppressWarnings("unchecked")
484         CompletableFuture<OperationOutcome> result = anyOf(arrFutures);
485         return result;
486     }
487
488     /**
489      * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
490      * outstanding futures when one completes.
491      *
492      * @param futures futures for which to wait
493      * @return a future to cancel or await an outcome. If this future is canceled, then
494      *         all of the futures will be canceled
495      */
496     protected CompletableFuture<OperationOutcome> anyOf(
497                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
498
499         if (futures.length == 1) {
500             return futures[0];
501         }
502
503         final Executor executor = params.getExecutor();
504         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
505
506         attachFutures(controller, futures);
507
508         // @formatter:off
509         CompletableFuture.anyOf(futures)
510                             .thenApply(object -> (OperationOutcome) object)
511                             .whenCompleteAsync(controller.delayedComplete(), executor);
512         // @formatter:on
513
514         return controller;
515     }
516
517     /**
518      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
519      * the futures if returned future is canceled. The future returns the "worst" outcome,
520      * based on priority (see {@link #detmPriority(OperationOutcome)}).
521      *
522      * @param futures futures for which to wait
523      * @return a future to cancel or await an outcome. If this future is canceled, then
524      *         all of the futures will be canceled
525      */
526     protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) {
527
528         // convert list to an array
529         @SuppressWarnings("rawtypes")
530         CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
531
532         @SuppressWarnings("unchecked")
533         CompletableFuture<OperationOutcome> result = allOf(arrFutures);
534         return result;
535     }
536
537     /**
538      * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
539      * futures if returned future is canceled. The future returns the "worst" outcome,
540      * based on priority (see {@link #detmPriority(OperationOutcome)}).
541      *
542      * @param futures futures for which to wait
543      * @return a future to cancel or await an outcome. If this future is canceled, then
544      *         all of the futures will be canceled
545      */
546     protected CompletableFuture<OperationOutcome> allOf(
547                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
548
549         if (futures.length == 1) {
550             return futures[0];
551         }
552
553         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
554
555         attachFutures(controller, futures);
556
557         OperationOutcome[] outcomes = new OperationOutcome[futures.length];
558
559         @SuppressWarnings("rawtypes")
560         CompletableFuture[] futures2 = new CompletableFuture[futures.length];
561
562         // record the outcomes of each future when it completes
563         for (int count = 0; count < futures2.length; ++count) {
564             final int count2 = count;
565             futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
566         }
567
568         // @formatter:off
569         CompletableFuture.allOf(futures2)
570                         .thenApply(unused -> combineOutcomes(outcomes))
571                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
572         // @formatter:on
573
574         return controller;
575     }
576
577     /**
578      * Attaches the given futures to the controller.
579      *
580      * @param controller master controller for all of the futures
581      * @param futures futures to be attached to the controller
582      */
583     private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
584                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
585
586         if (futures.length == 0) {
587             throw new IllegalArgumentException("empty list of futures");
588         }
589
590         // attach each task
591         for (CompletableFuture<OperationOutcome> future : futures) {
592             controller.add(future);
593         }
594     }
595
596     /**
597      * Combines the outcomes from a set of tasks.
598      *
599      * @param outcomes outcomes to be examined
600      * @return the combined outcome
601      */
602     private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) {
603
604         // identify the outcome with the highest priority
605         OperationOutcome outcome = outcomes[0];
606         int priority = detmPriority(outcome);
607
608         // start with "1", as we've already dealt with "0"
609         for (int count = 1; count < outcomes.length; ++count) {
610             OperationOutcome outcome2 = outcomes[count];
611             int priority2 = detmPriority(outcome2);
612
613             if (priority2 > priority) {
614                 outcome = outcome2;
615                 priority = priority2;
616             }
617         }
618
619         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
620                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
621
622         return outcome;
623     }
624
625     /**
626      * Determines the priority of an outcome based on its result.
627      *
628      * @param outcome outcome to examine, or {@code null}
629      * @return the outcome's priority
630      */
631     protected int detmPriority(OperationOutcome outcome) {
632         if (outcome == null || outcome.getResult() == null) {
633             return 1;
634         }
635
636         switch (outcome.getResult()) {
637             case SUCCESS:
638                 return 0;
639
640             case FAILURE_GUARD:
641                 return 2;
642
643             case FAILURE_RETRIES:
644                 return 3;
645
646             case FAILURE:
647                 return 4;
648
649             case FAILURE_TIMEOUT:
650                 return 5;
651
652             case FAILURE_EXCEPTION:
653             default:
654                 return 6;
655         }
656     }
657
658     /**
659      * Performs a task, after verifying that the controller is still running. Also checks
660      * that the previous outcome was successful, if specified.
661      *
662      * @param controller overall pipeline controller
663      * @param checkSuccess {@code true} to check the previous outcome, {@code false}
664      *        otherwise
665      * @param outcome outcome of the previous task
666      * @param task task to be performed
667      * @return the task, if everything checks out. Otherwise, it returns an incomplete
668      *         future and completes the controller instead
669      */
670     // @formatter:off
671     protected CompletableFuture<OperationOutcome> doTask(
672                     PipelineControllerFuture<OperationOutcome> controller,
673                     boolean checkSuccess, OperationOutcome outcome,
674                     CompletableFuture<OperationOutcome> task) {
675         // @formatter:on
676
677         if (checkSuccess && !isSuccess(outcome)) {
678             /*
679              * must complete before canceling so that cancel() doesn't cause controller to
680              * complete
681              */
682             controller.complete(outcome);
683             task.cancel(false);
684             return new CompletableFuture<>();
685         }
686
687         return controller.wrap(task);
688     }
689
690     /**
691      * Performs a task, after verifying that the controller is still running. Also checks
692      * that the previous outcome was successful, if specified.
693      *
694      * @param controller overall pipeline controller
695      * @param checkSuccess {@code true} to check the previous outcome, {@code false}
696      *        otherwise
697      * @param task function to start the task to be performed
698      * @return a function to perform the task. If everything checks out, then it returns
699      *         the task. Otherwise, it returns an incomplete future and completes the
700      *         controller instead
701      */
702     // @formatter:off
703     protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(
704                     PipelineControllerFuture<OperationOutcome> controller,
705                     boolean checkSuccess,
706                     Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
707         // @formatter:on
708
709         return outcome -> {
710
711             if (!controller.isRunning()) {
712                 return new CompletableFuture<>();
713             }
714
715             if (checkSuccess && !isSuccess(outcome)) {
716                 controller.complete(outcome);
717                 return new CompletableFuture<>();
718             }
719
720             return controller.wrap(task.apply(outcome));
721         };
722     }
723
724     /**
725      * Sets the start time of the operation and invokes the callback to indicate that the
726      * operation has started. Does nothing if the pipeline has been stopped.
727      * <p/>
728      * This assumes that the "outcome" is not {@code null}.
729      *
730      * @param callbacks used to determine if the start callback can be invoked
731      * @return a function that sets the start time and invokes the callback
732      */
733     private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
734
735         return (outcome, thrown) -> {
736
737             if (callbacks.canStart()) {
738                 // haven't invoked "start" callback yet
739                 outcome.setStart(callbacks.getStartTime());
740                 outcome.setEnd(null);
741                 params.callbackStarted(outcome);
742             }
743         };
744     }
745
746     /**
747      * Sets the end time of the operation and invokes the callback to indicate that the
748      * operation has completed. Does nothing if the pipeline has been stopped.
749      * <p/>
750      * This assumes that the "outcome" is not {@code null}.
751      * <p/>
752      * Note: the start time must be a reference rather than a plain value, because it's
753      * value must be gotten on-demand, when the returned function is executed at a later
754      * time.
755      *
756      * @param callbacks used to determine if the end callback can be invoked
757      * @return a function that sets the end time and invokes the callback
758      */
759     private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
760
761         return (outcome, thrown) -> {
762
763             if (callbacks.canEnd()) {
764                 outcome.setStart(callbacks.getStartTime());
765                 outcome.setEnd(callbacks.getEndTime());
766                 params.callbackCompleted(outcome);
767             }
768         };
769     }
770
771     /**
772      * Sets an operation's outcome and message, based on a throwable.
773      *
774      * @param operation operation to be updated
775      * @return the updated operation
776      */
777     protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
778         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
779         return setOutcome(operation, result);
780     }
781
782     /**
783      * Sets an operation's outcome and default message based on the result.
784      *
785      * @param operation operation to be updated
786      * @param result result of the operation
787      * @return the updated operation
788      */
789     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
790         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
791         operation.setResult(result);
792         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
793                         : ControlLoopOperation.FAILED_MSG);
794
795         return operation;
796     }
797
798     /**
799      * Determines if a throwable is due to a timeout.
800      *
801      * @param thrown throwable of interest
802      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
803      */
804     protected boolean isTimeout(Throwable thrown) {
805         if (thrown instanceof CompletionException) {
806             thrown = thrown.getCause();
807         }
808
809         return (thrown instanceof TimeoutException);
810     }
811
812     // these may be overridden by subclasses or junit tests
813
814     /**
815      * Gets the retry count.
816      *
817      * @param retry retry, extracted from the parameters, or {@code null}
818      * @return the number of retries, or {@code 0} if no retries were specified
819      */
820     protected int getRetry(Integer retry) {
821         return (retry == null ? 0 : retry);
822     }
823
824     /**
825      * Gets the retry wait, in milliseconds.
826      *
827      * @return the retry wait, in milliseconds
828      */
829     protected long getRetryWaitMs() {
830         return DEFAULT_RETRY_WAIT_MS;
831     }
832
833     /**
834      * Gets the operation timeout.
835      *
836      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
837      *        {@code null}
838      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
839      *         specified
840      */
841     protected long getTimeoutMs(Integer timeoutSec) {
842         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
843     }
844 }