df5258d717babe0f1d6c5102d21e442c16ed5390
[policy/models.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.CompletionException;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.function.BiConsumer;
31 import java.util.function.Function;
32 import lombok.AccessLevel;
33 import lombok.Getter;
34 import lombok.Setter;
35 import org.onap.policy.controlloop.ControlLoopOperation;
36 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
37 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
38 import org.onap.policy.controlloop.actorserviceprovider.Operator;
39 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
40 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
41 import org.onap.policy.controlloop.policy.PolicyResult;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Partial implementation of an operator. In general, it's preferable that subclasses
47  * would override
48  * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome)
49  * startOperationAsync()}. However, if that proves to be too difficult, then they can
50  * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome)
51  * doOperation()}. In addition, if the operation requires any preprocessor steps, the
52  * subclass may choose to override
53  * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}.
54  * <p/>
55  * The futures returned by the methods within this class can be canceled, and will
56  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
57  * returned by overridden methods will do the same. Of course, if a class overrides
58  * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()},
59  * then there's little that can be done to cancel that particular operation.
60  */
61 public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
62
63     private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
64
65     /**
66      * Executor to be used for tasks that may perform blocking I/O. The default executor
67      * simply launches a new thread for each command that is submitted to it.
68      * <p/>
69      * May be overridden by junit tests.
70      */
71     @Getter(AccessLevel.PROTECTED)
72     @Setter(AccessLevel.PROTECTED)
73     private Executor blockingExecutor = command -> {
74         Thread thread = new Thread(command);
75         thread.setDaemon(true);
76         thread.start();
77     };
78
79     @Getter
80     private final String actorName;
81
82     @Getter
83     private final String name;
84
85     /**
86      * Constructs the object.
87      *
88      * @param actorName name of the actor with which this operator is associated
89      * @param name operation name
90      */
91     public OperatorPartial(String actorName, String name) {
92         super(actorName + "." + name);
93         this.actorName = actorName;
94         this.name = name;
95     }
96
97     /**
98      * This method does nothing.
99      */
100     @Override
101     protected void doConfigure(Map<String, Object> parameters) {
102         // do nothing
103     }
104
105     /**
106      * This method does nothing.
107      */
108     @Override
109     protected void doStart() {
110         // do nothing
111     }
112
113     /**
114      * This method does nothing.
115      */
116     @Override
117     protected void doStop() {
118         // do nothing
119     }
120
121     /**
122      * This method does nothing.
123      */
124     @Override
125     protected void doShutdown() {
126         // do nothing
127     }
128
129     @Override
130     public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) {
131         if (!isAlive()) {
132             throw new IllegalStateException("operation is not running: " + getFullName());
133         }
134
135         // allocate a controller for the entire operation
136         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
137
138         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params);
139         if (preproc == null) {
140             // no preprocessor required - just start the operation
141             return startOperationAttempt(params, controller, 1);
142         }
143
144         /*
145          * Do preprocessor first and then, if successful, start the operation. Note:
146          * operations create their own outcome, ignoring the outcome from any previous
147          * steps.
148          *
149          * Wrap the preprocessor to ensure "stop" is propagated to it.
150          */
151         // @formatter:off
152         controller.wrap(preproc)
153                         .exceptionally(fromException(params, "preprocessor of operation"))
154                         .thenCompose(handlePreprocessorFailure(params, controller))
155                         .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1));
156         // @formatter:on
157
158         return controller;
159     }
160
161     /**
162      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
163      * invokes the call-backs, marks the controller complete, and returns an incomplete
164      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
165      * received.
166      * <p/>
167      * Assumes that no callbacks have been invoked yet.
168      *
169      * @param params operation parameters
170      * @param controller pipeline controller
171      * @return a function that checks the outcome status and continues, if successful, or
172      *         indicates a failure otherwise
173      */
174     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
175                     ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) {
176
177         return outcome -> {
178
179             if (outcome != null && isSuccess(outcome)) {
180                 logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
181                 return CompletableFuture.completedFuture(outcome);
182             }
183
184             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
185
186             final Executor executor = params.getExecutor();
187             final CallbackManager callbacks = new CallbackManager();
188
189             // propagate "stop" to the callbacks
190             controller.add(callbacks);
191
192             final OperationOutcome outcome2 = params.makeOutcome();
193
194             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
195
196             outcome2.setResult(PolicyResult.FAILURE_GUARD);
197             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
198
199             // @formatter:off
200             CompletableFuture.completedFuture(outcome2)
201                             .whenCompleteAsync(callbackStarted(params, callbacks), executor)
202                             .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
203                             .whenCompleteAsync(controller.delayedComplete(), executor);
204             // @formatter:on
205
206             return new CompletableFuture<>();
207         };
208     }
209
210     /**
211      * Invokes the operation's preprocessor step(s) as a "future". This method simply
212      * returns {@code null}.
213      * <p/>
214      * This method assumes the following:
215      * <ul>
216      * <li>the operator is alive</li>
217      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
218      * </ul>
219      *
220      * @param params operation parameters
221      * @return a function that will start the preprocessor and returns its outcome, or
222      *         {@code null} if this operation needs no preprocessor
223      */
224     protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
225         return null;
226     }
227
228     /**
229      * Starts the operation attempt, with no preprocessor. When all retries complete, it
230      * will complete the controller.
231      *
232      * @param params operation parameters
233      * @param controller controller for all operation attempts
234      * @param attempt attempt number, typically starting with 1
235      * @return a future that will return the final result of all attempts
236      */
237     private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params,
238                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
239
240         // propagate "stop" to the operation attempt
241         controller.wrap(startAttemptWithoutRetries(params, attempt))
242                         .thenCompose(retryOnFailure(params, controller, attempt));
243
244         return controller;
245     }
246
247     /**
248      * Starts the operation attempt, without doing any retries.
249      *
250      * @param params operation parameters
251      * @param attempt attempt number, typically starting with 1
252      * @return a future that will return the result of a single operation attempt
253      */
254     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params,
255                     int attempt) {
256
257         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
258
259         final Executor executor = params.getExecutor();
260         final OperationOutcome outcome = params.makeOutcome();
261         final CallbackManager callbacks = new CallbackManager();
262
263         // this operation attempt gets its own controller
264         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
265
266         // propagate "stop" to the callbacks
267         controller.add(callbacks);
268
269         // @formatter:off
270         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
271                         .whenCompleteAsync(callbackStarted(params, callbacks), executor)
272                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2)));
273         // @formatter:on
274
275         // handle timeouts, if specified
276         long timeoutMillis = getTimeOutMillis(params.getTimeoutSec());
277         if (timeoutMillis > 0) {
278             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
279             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
280         }
281
282         /*
283          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
284          * before callbackCompleted() is invoked.
285          *
286          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
287          * the pipeline as the last step anyway.
288          */
289
290         // @formatter:off
291         future.exceptionally(fromException(params, "operation"))
292                     .thenApply(setRetryFlag(params, attempt))
293                     .whenCompleteAsync(callbackStarted(params, callbacks), executor)
294                     .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
295                     .whenCompleteAsync(controller.delayedComplete(), executor);
296         // @formatter:on
297
298         return controller;
299     }
300
301     /**
302      * Determines if the outcome was successful.
303      *
304      * @param outcome outcome to examine
305      * @return {@code true} if the outcome was successful
306      */
307     protected boolean isSuccess(OperationOutcome outcome) {
308         return (outcome.getResult() == PolicyResult.SUCCESS);
309     }
310
311     /**
312      * Determines if the outcome was a failure for this operator.
313      *
314      * @param outcome outcome to examine, or {@code null}
315      * @return {@code true} if the outcome is not {@code null} and was a failure
316      *         <i>and</i> was associated with this operator, {@code false} otherwise
317      */
318     protected boolean isActorFailed(OperationOutcome outcome) {
319         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
320     }
321
322     /**
323      * Determines if the given outcome is for this operation.
324      *
325      * @param outcome outcome to examine
326      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
327      */
328     protected boolean isSameOperation(OperationOutcome outcome) {
329         return OperationOutcome.isFor(outcome, getActorName(), getName());
330     }
331
332     /**
333      * Invokes the operation as a "future". This method simply invokes
334      * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor
335      * "blocking executor"}, returning the result via a "future".
336      * <p/>
337      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
338      * the executor in the "params", as that may bring the background thread pool to a
339      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
340      * instead.
341      * <p/>
342      * This method assumes the following:
343      * <ul>
344      * <li>the operator is alive</li>
345      * <li>verifyRunning() has been invoked</li>
346      * <li>callbackStarted() has been invoked</li>
347      * <li>the invoker will perform appropriate timeout checks</li>
348      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
349      * </ul>
350      *
351      * @param params operation parameters
352      * @param attempt attempt number, typically starting with 1
353      * @return a function that will start the operation and return its result when
354      *         complete
355      */
356     protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt,
357                     OperationOutcome outcome) {
358
359         return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor());
360     }
361
362     /**
363      * Low-level method that performs the operation. This can make the same assumptions
364      * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
365      * particular method simply throws an {@link UnsupportedOperationException}.
366      *
367      * @param params operation parameters
368      * @param attempt attempt number, typically starting with 1
369      * @param operation the operation being performed
370      * @return the outcome of the operation
371      */
372     protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) {
373
374         throw new UnsupportedOperationException("start operation " + getFullName());
375     }
376
377     /**
378      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
379      * FAILURE, assuming the policy specifies retries and the retry count has been
380      * exhausted.
381      *
382      * @param params operation parameters
383      * @param attempt latest attempt number, starting with 1
384      * @return a function to get the next future to execute
385      */
386     private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) {
387
388         return operation -> {
389             if (operation != null && !isActorFailed(operation)) {
390                 /*
391                  * wrong type or wrong operation - just leave it as is. No need to log
392                  * anything here, as retryOnFailure() will log a message
393                  */
394                 return operation;
395             }
396
397             // get a non-null operation
398             OperationOutcome oper2;
399             if (operation != null) {
400                 oper2 = operation;
401             } else {
402                 oper2 = params.makeOutcome();
403                 oper2.setResult(PolicyResult.FAILURE);
404             }
405
406             Integer retry = params.getRetry();
407             if (retry != null && retry > 0 && attempt > retry) {
408                 /*
409                  * retries were specified and we've already tried them all - change to
410                  * FAILURE_RETRIES
411                  */
412                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
413                 oper2.setResult(PolicyResult.FAILURE_RETRIES);
414             }
415
416             return oper2;
417         };
418     }
419
420     /**
421      * Restarts the operation if it was a FAILURE. Assumes that
422      * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and
423      * thus that the "operation" is not {@code null}.
424      *
425      * @param params operation parameters
426      * @param controller controller for all of the retries
427      * @param attempt latest attempt number, starting with 1
428      * @return a function to get the next future to execute
429      */
430     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
431                     ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller,
432                     int attempt) {
433
434         return operation -> {
435             if (!isActorFailed(operation)) {
436                 // wrong type or wrong operation - just leave it as is
437                 logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
438                 controller.complete(operation);
439                 return new CompletableFuture<>();
440             }
441
442             Integer retry = params.getRetry();
443             if (retry == null || retry <= 0) {
444                 // no retries - already marked as FAILURE, so just return it
445                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
446                 controller.complete(operation);
447                 return new CompletableFuture<>();
448             }
449
450
451             /*
452              * Retry the operation.
453              */
454             logger.info("retry operation {} for {}", getFullName(), params.getRequestId());
455
456             return startOperationAttempt(params, controller, attempt + 1);
457         };
458     }
459
460     /**
461      * Converts an exception into an operation outcome, returning a copy of the outcome to
462      * prevent background jobs from changing it.
463      *
464      * @param params operation parameters
465      * @param type type of item throwing the exception
466      * @return a function that will convert an exception into an operation outcome
467      */
468     private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) {
469
470         return thrown -> {
471             OperationOutcome outcome = params.makeOutcome();
472
473             logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
474                             params.getRequestId(), thrown);
475
476             return setOutcome(params, outcome, thrown);
477         };
478     }
479
480     /**
481      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
482      * any outstanding futures when one completes.
483      *
484      * @param params operation parameters
485      * @param futures futures for which to wait
486      * @return a future to cancel or await an outcome. If this future is canceled, then
487      *         all of the futures will be canceled
488      */
489     protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
490                     List<CompletableFuture<OperationOutcome>> futures) {
491
492         // convert list to an array
493         @SuppressWarnings("rawtypes")
494         CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
495
496         @SuppressWarnings("unchecked")
497         CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures);
498         return result;
499     }
500
501     /**
502      * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
503      * outstanding futures when one completes.
504      *
505      * @param params operation parameters
506      * @param futures futures for which to wait
507      * @return a future to cancel or await an outcome. If this future is canceled, then
508      *         all of the futures will be canceled
509      */
510     protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
511                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
512
513         final Executor executor = params.getExecutor();
514         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
515
516         attachFutures(controller, futures);
517
518         // @formatter:off
519         CompletableFuture.anyOf(futures)
520                             .thenApply(object -> (OperationOutcome) object)
521                             .whenCompleteAsync(controller.delayedComplete(), executor);
522         // @formatter:on
523
524         return controller;
525     }
526
527     /**
528      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
529      * the futures if returned future is canceled. The future returns the "worst" outcome,
530      * based on priority (see {@link #detmPriority(OperationOutcome)}).
531      *
532      * @param params operation parameters
533      * @param futures futures for which to wait
534      * @return a future to cancel or await an outcome. If this future is canceled, then
535      *         all of the futures will be canceled
536      */
537     protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
538                     List<CompletableFuture<OperationOutcome>> futures) {
539
540         // convert list to an array
541         @SuppressWarnings("rawtypes")
542         CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
543
544         @SuppressWarnings("unchecked")
545         CompletableFuture<OperationOutcome> result = allOf(params, arrFutures);
546         return result;
547     }
548
549     /**
550      * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
551      * futures if returned future is canceled. The future returns the "worst" outcome,
552      * based on priority (see {@link #detmPriority(OperationOutcome)}).
553      *
554      * @param params operation parameters
555      * @param futures futures for which to wait
556      * @return a future to cancel or await an outcome. If this future is canceled, then
557      *         all of the futures will be canceled
558      */
559     protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
560                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
561
562         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
563
564         attachFutures(controller, futures);
565
566         OperationOutcome[] outcomes = new OperationOutcome[futures.length];
567
568         @SuppressWarnings("rawtypes")
569         CompletableFuture[] futures2 = new CompletableFuture[futures.length];
570
571         // record the outcomes of each future when it completes
572         for (int count = 0; count < futures2.length; ++count) {
573             final int count2 = count;
574             futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
575         }
576
577         CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes));
578
579         return controller;
580     }
581
582     /**
583      * Attaches the given futures to the controller.
584      *
585      * @param controller master controller for all of the futures
586      * @param futures futures to be attached to the controller
587      */
588     private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
589                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
590
591         // attach each task
592         for (CompletableFuture<OperationOutcome> future : futures) {
593             controller.add(future);
594         }
595     }
596
597     /**
598      * Combines the outcomes from a set of tasks.
599      *
600      * @param params operation parameters
601      * @param future future to be completed with the combined result
602      * @param outcomes outcomes to be examined
603      */
604     private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params,
605                     CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) {
606
607         return (unused, thrown) -> {
608             if (thrown != null) {
609                 future.completeExceptionally(thrown);
610                 return;
611             }
612
613             // identify the outcome with the highest priority
614             OperationOutcome outcome = outcomes[0];
615             int priority = detmPriority(outcome);
616
617             // start with "1", as we've already dealt with "0"
618             for (int count = 1; count < outcomes.length; ++count) {
619                 OperationOutcome outcome2 = outcomes[count];
620                 int priority2 = detmPriority(outcome2);
621
622                 if (priority2 > priority) {
623                     outcome = outcome2;
624                     priority = priority2;
625                 }
626             }
627
628             logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(),
629                             (outcome == null ? null : outcome.getResult()), params.getRequestId());
630
631             future.complete(outcome);
632         };
633     }
634
635     /**
636      * Determines the priority of an outcome based on its result.
637      *
638      * @param outcome outcome to examine, or {@code null}
639      * @return the outcome's priority
640      */
641     protected int detmPriority(OperationOutcome outcome) {
642         if (outcome == null) {
643             return 1;
644         }
645
646         switch (outcome.getResult()) {
647             case SUCCESS:
648                 return 0;
649
650             case FAILURE_GUARD:
651                 return 2;
652
653             case FAILURE_RETRIES:
654                 return 3;
655
656             case FAILURE:
657                 return 4;
658
659             case FAILURE_TIMEOUT:
660                 return 5;
661
662             case FAILURE_EXCEPTION:
663             default:
664                 return 6;
665         }
666     }
667
668     /**
669      * Performs a task, after verifying that the controller is still running. Also checks
670      * that the previous outcome was successful, if specified.
671      *
672      * @param params operation parameters
673      * @param controller overall pipeline controller
674      * @param checkSuccess {@code true} to check the previous outcome, {@code false}
675      *        otherwise
676      * @param outcome outcome of the previous task
677      * @param tasks tasks to be performed
678      * @return a function to perform the task. If everything checks out, then it returns
679      *         the task's future. Otherwise, it returns an incomplete future and completes
680      *         the controller instead.
681      */
682     // @formatter:off
683     protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params,
684                     PipelineControllerFuture<OperationOutcome> controller,
685                     boolean checkSuccess, OperationOutcome outcome,
686                     CompletableFuture<OperationOutcome> task) {
687         // @formatter:on
688
689         if (checkSuccess && !isSuccess(outcome)) {
690             /*
691              * must complete before canceling so that cancel() doesn't cause controller to
692              * complete
693              */
694             controller.complete(outcome);
695             task.cancel(false);
696             return new CompletableFuture<>();
697         }
698
699         return controller.wrap(task);
700     }
701
702     /**
703      * Performs a task, after verifying that the controller is still running. Also checks
704      * that the previous outcome was successful, if specified.
705      *
706      * @param params operation parameters
707      * @param controller overall pipeline controller
708      * @param checkSuccess {@code true} to check the previous outcome, {@code false}
709      *        otherwise
710      * @param tasks tasks to be performed
711      * @return a function to perform the task. If everything checks out, then it returns
712      *         the task's future. Otherwise, it returns an incomplete future and completes
713      *         the controller instead.
714      */
715     // @formatter:off
716     protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params,
717                     PipelineControllerFuture<OperationOutcome> controller,
718                     boolean checkSuccess,
719                     Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
720         // @formatter:on
721
722         return outcome -> {
723
724             if (!controller.isRunning()) {
725                 return new CompletableFuture<>();
726             }
727
728             if (checkSuccess && !isSuccess(outcome)) {
729                 controller.complete(outcome);
730                 return new CompletableFuture<>();
731             }
732
733             return controller.wrap(task.apply(outcome));
734         };
735     }
736
737     /**
738      * Sets the start time of the operation and invokes the callback to indicate that the
739      * operation has started. Does nothing if the pipeline has been stopped.
740      * <p/>
741      * This assumes that the "outcome" is not {@code null}.
742      *
743      * @param params operation parameters
744      * @param callbacks used to determine if the start callback can be invoked
745      * @return a function that sets the start time and invokes the callback
746      */
747     private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params,
748                     CallbackManager callbacks) {
749
750         return (outcome, thrown) -> {
751
752             if (callbacks.canStart()) {
753                 // haven't invoked "start" callback yet
754                 outcome.setStart(callbacks.getStartTime());
755                 outcome.setEnd(null);
756                 params.callbackStarted(outcome);
757             }
758         };
759     }
760
761     /**
762      * Sets the end time of the operation and invokes the callback to indicate that the
763      * operation has completed. Does nothing if the pipeline has been stopped.
764      * <p/>
765      * This assumes that the "outcome" is not {@code null}.
766      * <p/>
767      * Note: the start time must be a reference rather than a plain value, because it's
768      * value must be gotten on-demand, when the returned function is executed at a later
769      * time.
770      *
771      * @param params operation parameters
772      * @param callbacks used to determine if the end callback can be invoked
773      * @return a function that sets the end time and invokes the callback
774      */
775     private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params,
776                     CallbackManager callbacks) {
777
778         return (outcome, thrown) -> {
779
780             if (callbacks.canEnd()) {
781                 outcome.setStart(callbacks.getStartTime());
782                 outcome.setEnd(callbacks.getEndTime());
783                 params.callbackCompleted(outcome);
784             }
785         };
786     }
787
788     /**
789      * Sets an operation's outcome and message, based on a throwable.
790      *
791      * @param params operation parameters
792      * @param operation operation to be updated
793      * @return the updated operation
794      */
795     protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
796                     Throwable thrown) {
797         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
798         return setOutcome(params, operation, result);
799     }
800
801     /**
802      * Sets an operation's outcome and default message based on the result.
803      *
804      * @param params operation parameters
805      * @param operation operation to be updated
806      * @param result result of the operation
807      * @return the updated operation
808      */
809     protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
810                     PolicyResult result) {
811         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
812         operation.setResult(result);
813         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
814                         : ControlLoopOperation.FAILED_MSG);
815
816         return operation;
817     }
818
819     /**
820      * Determines if a throwable is due to a timeout.
821      *
822      * @param thrown throwable of interest
823      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
824      */
825     protected boolean isTimeout(Throwable thrown) {
826         if (thrown instanceof CompletionException) {
827             thrown = thrown.getCause();
828         }
829
830         return (thrown instanceof TimeoutException);
831     }
832
833     // these may be overridden by junit tests
834
835     /**
836      * Gets the operation timeout. Subclasses may override this method to obtain the
837      * timeout in some other way (e.g., through configuration properties).
838      *
839      * @param timeoutSec timeout, in seconds, or {@code null}
840      * @return the operation timeout, in milliseconds
841      */
842     protected long getTimeOutMillis(Integer timeoutSec) {
843         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
844     }
845 }