Merge "Allow storing database password in environment variables"
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.CompletionException;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.function.BiConsumer;
35 import java.util.function.Function;
36 import java.util.function.Supplier;
37 import java.util.function.UnaryOperator;
38 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
39 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
40 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
41 import org.onap.policy.common.utils.coder.Coder;
42 import org.onap.policy.common.utils.coder.CoderException;
43 import org.onap.policy.common.utils.coder.StandardCoder;
44 import org.onap.policy.controlloop.ControlLoopOperation;
45 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
46 import org.onap.policy.controlloop.actorserviceprovider.Operation;
47 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
48 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
49 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
50 import org.onap.policy.controlloop.policy.PolicyResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Partial implementation of an operator. In general, it's preferable that subclasses
56  * would override {@link #startOperationAsync(int, OperationOutcome)
57  * startOperationAsync()}. However, if that proves to be too difficult, then they can
58  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
59  * if the operation requires any preprocessor steps, the subclass may choose to override
60  * {@link #startPreprocessorAsync()}.
61  * <p/>
62  * The futures returned by the methods within this class can be canceled, and will
63  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
64  * returned by overridden methods will do the same. Of course, if a class overrides
65  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
66  * be done to cancel that particular operation.
67  */
68 public abstract class OperationPartial implements Operation {
69     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
70     private static final Coder coder = new StandardCoder();
71
72     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
73
74     // values extracted from the operator
75
76     private final OperatorPartial operator;
77
78     /**
79      * Operation parameters.
80      */
81     protected final ControlLoopOperationParams params;
82
83
84     /**
85      * Constructs the object.
86      *
87      * @param params operation parameters
88      * @param operator operator that created this operation
89      */
90     public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) {
91         this.params = params;
92         this.operator = operator;
93     }
94
95     public Executor getBlockingExecutor() {
96         return operator.getBlockingExecutor();
97     }
98
99     public String getFullName() {
100         return operator.getFullName();
101     }
102
103     public String getActorName() {
104         return operator.getActorName();
105     }
106
107     public String getName() {
108         return operator.getName();
109     }
110
111     @Override
112     public final CompletableFuture<OperationOutcome> start() {
113         if (!operator.isAlive()) {
114             throw new IllegalStateException("operation is not running: " + getFullName());
115         }
116
117         // allocate a controller for the entire operation
118         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
119
120         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
121         if (preproc == null) {
122             // no preprocessor required - just start the operation
123             return startOperationAttempt(controller, 1);
124         }
125
126         /*
127          * Do preprocessor first and then, if successful, start the operation. Note:
128          * operations create their own outcome, ignoring the outcome from any previous
129          * steps.
130          *
131          * Wrap the preprocessor to ensure "stop" is propagated to it.
132          */
133         // @formatter:off
134         controller.wrap(preproc)
135                         .exceptionally(fromException("preprocessor of operation"))
136                         .thenCompose(handlePreprocessorFailure(controller))
137                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
138                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
139         // @formatter:on
140
141         return controller;
142     }
143
144     /**
145      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
146      * invokes the call-backs, marks the controller complete, and returns an incomplete
147      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
148      * received.
149      * <p/>
150      * Assumes that no callbacks have been invoked yet.
151      *
152      * @param controller pipeline controller
153      * @return a function that checks the outcome status and continues, if successful, or
154      *         indicates a failure otherwise
155      */
156     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
157                     PipelineControllerFuture<OperationOutcome> controller) {
158
159         return outcome -> {
160
161             if (outcome != null && isSuccess(outcome)) {
162                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
163                 return CompletableFuture.completedFuture(outcome);
164             }
165
166             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
167
168             final Executor executor = params.getExecutor();
169             final CallbackManager callbacks = new CallbackManager();
170
171             // propagate "stop" to the callbacks
172             controller.add(callbacks);
173
174             final OperationOutcome outcome2 = params.makeOutcome();
175
176             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
177
178             outcome2.setResult(PolicyResult.FAILURE_GUARD);
179             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
180
181             // @formatter:off
182             CompletableFuture.completedFuture(outcome2)
183                             .whenCompleteAsync(callbackStarted(callbacks), executor)
184                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
185                             .whenCompleteAsync(controller.delayedComplete(), executor);
186             // @formatter:on
187
188             return new CompletableFuture<>();
189         };
190     }
191
192     /**
193      * Invokes the operation's preprocessor step(s) as a "future". This method simply
194      * invokes {@link #startGuardAsync()}.
195      * <p/>
196      * This method assumes the following:
197      * <ul>
198      * <li>the operator is alive</li>
199      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
200      * </ul>
201      *
202      * @return a function that will start the preprocessor and returns its outcome, or
203      *         {@code null} if this operation needs no preprocessor
204      */
205     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
206         return startGuardAsync();
207     }
208
209     /**
210      * Invokes the operation's guard step(s) as a "future". This method simply returns
211      * {@code null}.
212      * <p/>
213      * This method assumes the following:
214      * <ul>
215      * <li>the operator is alive</li>
216      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
217      * </ul>
218      *
219      * @return a function that will start the guard checks and returns its outcome, or
220      *         {@code null} if this operation has no guard
221      */
222     protected CompletableFuture<OperationOutcome> startGuardAsync() {
223         return null;
224     }
225
226     /**
227      * Starts the operation attempt, with no preprocessor. When all retries complete, it
228      * will complete the controller.
229      *
230      * @param controller controller for all operation attempts
231      * @param attempt attempt number, typically starting with 1
232      * @return a future that will return the final result of all attempts
233      */
234     private CompletableFuture<OperationOutcome> startOperationAttempt(
235                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
236
237         // propagate "stop" to the operation attempt
238         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
239                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
240
241         return controller;
242     }
243
244     /**
245      * Starts the operation attempt, without doing any retries.
246      *
247      * @param params operation parameters
248      * @param attempt attempt number, typically starting with 1
249      * @return a future that will return the result of a single operation attempt
250      */
251     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
252
253         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
254
255         final Executor executor = params.getExecutor();
256         final OperationOutcome outcome = params.makeOutcome();
257         final CallbackManager callbacks = new CallbackManager();
258
259         // this operation attempt gets its own controller
260         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
261
262         // propagate "stop" to the callbacks
263         controller.add(callbacks);
264
265         // @formatter:off
266         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
267                         .whenCompleteAsync(callbackStarted(callbacks), executor)
268                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
269         // @formatter:on
270
271         // handle timeouts, if specified
272         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
273         if (timeoutMillis > 0) {
274             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
275             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
276         }
277
278         /*
279          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
280          * before callbackCompleted() is invoked.
281          *
282          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
283          * the pipeline as the last step anyway.
284          */
285
286         // @formatter:off
287         future.exceptionally(fromException("operation"))
288                     .thenApply(setRetryFlag(attempt))
289                     .whenCompleteAsync(callbackStarted(callbacks), executor)
290                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
291                     .whenCompleteAsync(controller.delayedComplete(), executor);
292         // @formatter:on
293
294         return controller;
295     }
296
297     /**
298      * Determines if the outcome was successful.
299      *
300      * @param outcome outcome to examine
301      * @return {@code true} if the outcome was successful
302      */
303     protected boolean isSuccess(OperationOutcome outcome) {
304         return (outcome.getResult() == PolicyResult.SUCCESS);
305     }
306
307     /**
308      * Determines if the outcome was a failure for this operator.
309      *
310      * @param outcome outcome to examine, or {@code null}
311      * @return {@code true} if the outcome is not {@code null} and was a failure
312      *         <i>and</i> was associated with this operator, {@code false} otherwise
313      */
314     protected boolean isActorFailed(OperationOutcome outcome) {
315         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
316     }
317
318     /**
319      * Determines if the given outcome is for this operation.
320      *
321      * @param outcome outcome to examine
322      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
323      */
324     protected boolean isSameOperation(OperationOutcome outcome) {
325         return OperationOutcome.isFor(outcome, getActorName(), getName());
326     }
327
328     /**
329      * Invokes the operation as a "future". This method simply invokes
330      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
331      * returning the result via a "future".
332      * <p/>
333      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
334      * the executor in the "params", as that may bring the background thread pool to a
335      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
336      * instead.
337      * <p/>
338      * This method assumes the following:
339      * <ul>
340      * <li>the operator is alive</li>
341      * <li>verifyRunning() has been invoked</li>
342      * <li>callbackStarted() has been invoked</li>
343      * <li>the invoker will perform appropriate timeout checks</li>
344      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
345      * </ul>
346      *
347      * @param attempt attempt number, typically starting with 1
348      * @return a function that will start the operation and return its result when
349      *         complete
350      */
351     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
352
353         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
354     }
355
356     /**
357      * Low-level method that performs the operation. This can make the same assumptions
358      * that are made by {@link #doOperationAsFuture()}. This particular method simply
359      * throws an {@link UnsupportedOperationException}.
360      *
361      * @param attempt attempt number, typically starting with 1
362      * @param operation the operation being performed
363      * @return the outcome of the operation
364      */
365     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
366
367         throw new UnsupportedOperationException("start operation " + getFullName());
368     }
369
370     /**
371      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
372      * FAILURE, assuming the policy specifies retries and the retry count has been
373      * exhausted.
374      *
375      * @param attempt latest attempt number, starting with 1
376      * @return a function to get the next future to execute
377      */
378     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
379
380         return operation -> {
381             if (operation != null && !isActorFailed(operation)) {
382                 /*
383                  * wrong type or wrong operation - just leave it as is. No need to log
384                  * anything here, as retryOnFailure() will log a message
385                  */
386                 return operation;
387             }
388
389             // get a non-null operation
390             OperationOutcome oper2;
391             if (operation != null) {
392                 oper2 = operation;
393             } else {
394                 oper2 = params.makeOutcome();
395                 oper2.setResult(PolicyResult.FAILURE);
396             }
397
398             int retry = getRetry(params.getRetry());
399             if (retry > 0 && attempt > retry) {
400                 /*
401                  * retries were specified and we've already tried them all - change to
402                  * FAILURE_RETRIES
403                  */
404                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
405                 oper2.setResult(PolicyResult.FAILURE_RETRIES);
406             }
407
408             return oper2;
409         };
410     }
411
412     /**
413      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
414      * was previously invoked, and thus that the "operation" is not {@code null}.
415      *
416      * @param controller controller for all of the retries
417      * @param attempt latest attempt number, starting with 1
418      * @return a function to get the next future to execute
419      */
420     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
421                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
422
423         return operation -> {
424             if (!isActorFailed(operation)) {
425                 // wrong type or wrong operation - just leave it as is
426                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
427                 controller.complete(operation);
428                 return new CompletableFuture<>();
429             }
430
431             if (getRetry(params.getRetry()) <= 0) {
432                 // no retries - already marked as FAILURE, so just return it
433                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
434                 controller.complete(operation);
435                 return new CompletableFuture<>();
436             }
437
438             /*
439              * Retry the operation.
440              */
441             long waitMs = getRetryWaitMs();
442             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
443
444             return sleep(waitMs, TimeUnit.MILLISECONDS)
445                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
446         };
447     }
448
449     /**
450      * Convenience method that starts a sleep(), running via a future.
451      *
452      * @param sleepTime time to sleep
453      * @param unit time unit
454      * @return a future that will complete when the sleep completes
455      */
456     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
457         if (sleepTime <= 0) {
458             return CompletableFuture.completedFuture(null);
459         }
460
461         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
462     }
463
464     /**
465      * Converts an exception into an operation outcome, returning a copy of the outcome to
466      * prevent background jobs from changing it.
467      *
468      * @param type type of item throwing the exception
469      * @return a function that will convert an exception into an operation outcome
470      */
471     private Function<Throwable, OperationOutcome> fromException(String type) {
472
473         return thrown -> {
474             OperationOutcome outcome = params.makeOutcome();
475
476             logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
477                             params.getRequestId(), thrown);
478
479             return setOutcome(outcome, thrown);
480         };
481     }
482
483     /**
484      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
485      * any outstanding futures when one completes.
486      *
487      * @param futureMakers function to make a future. If the function returns
488      *        {@code null}, then no future is created for that function. On the other
489      *        hand, if the function throws an exception, then the previously created
490      *        functions are canceled and the exception is re-thrown
491      * @return a future to cancel or await an outcome, or {@code null} if no futures were
492      *         created. If this future is canceled, then all of the futures will be
493      *         canceled
494      */
495     protected CompletableFuture<OperationOutcome> anyOf(
496                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
497
498         return anyOf(Arrays.asList(futureMakers));
499     }
500
501     /**
502      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
503      * any outstanding futures when one completes.
504      *
505      * @param futureMakers function to make a future. If the function returns
506      *        {@code null}, then no future is created for that function. On the other
507      *        hand, if the function throws an exception, then the previously created
508      *        functions are canceled and the exception is re-thrown
509      * @return a future to cancel or await an outcome, or {@code null} if no futures were
510      *         created. If this future is canceled, then all of the futures will be
511      *         canceled. Similarly, when this future completes, any incomplete futures
512      *         will be canceled
513      */
514     protected CompletableFuture<OperationOutcome> anyOf(
515                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
516
517         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
518
519         CompletableFuture<OperationOutcome>[] futures =
520                         attachFutures(controller, futureMakers, UnaryOperator.identity());
521
522         if (futures.length == 0) {
523             // no futures were started
524             return null;
525         }
526
527         if (futures.length == 1) {
528             return futures[0];
529         }
530
531         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
532                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
533
534         return controller;
535     }
536
537     /**
538      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
539      *
540      * @param futureMakers function to make a future. If the function returns
541      *        {@code null}, then no future is created for that function. On the other
542      *        hand, if the function throws an exception, then the previously created
543      *        functions are canceled and the exception is re-thrown
544      * @return a future to cancel or await an outcome, or {@code null} if no futures were
545      *         created. If this future is canceled, then all of the futures will be
546      *         canceled
547      */
548     protected CompletableFuture<OperationOutcome> allOf(
549                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
550
551         return allOf(Arrays.asList(futureMakers));
552     }
553
554     /**
555      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
556      *
557      * @param futureMakers function to make a future. If the function returns
558      *        {@code null}, then no future is created for that function. On the other
559      *        hand, if the function throws an exception, then the previously created
560      *        functions are canceled and the exception is re-thrown
561      * @return a future to cancel or await an outcome, or {@code null} if no futures were
562      *         created. If this future is canceled, then all of the futures will be
563      *         canceled. Similarly, when this future completes, any incomplete futures
564      *         will be canceled
565      */
566     protected CompletableFuture<OperationOutcome> allOf(
567                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
568         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
569
570         Queue<OperationOutcome> outcomes = new LinkedList<>();
571
572         CompletableFuture<OperationOutcome>[] futures =
573                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
574                             synchronized (outcomes) {
575                                 outcomes.add(outcome);
576                             }
577                             return outcome;
578                         }));
579
580         if (futures.length == 0) {
581             // no futures were started
582             return null;
583         }
584
585         if (futures.length == 1) {
586             return futures[0];
587         }
588
589         // @formatter:off
590         CompletableFuture.allOf(futures)
591                         .thenApply(unused -> combineOutcomes(outcomes))
592                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
593         // @formatter:on
594
595         return controller;
596     }
597
598     /**
599      * Invokes the functions to create the futures and attaches them to the controller.
600      *
601      * @param controller master controller for all of the futures
602      * @param futureMakers futures to be attached to the controller
603      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
604      *        Returns the adorned future
605      * @return an array of futures, possibly zero-length. If the array is of size one,
606      *         then that one item should be returned instead of the controller
607      */
608     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
609                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
610                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
611
612         if (futureMakers.isEmpty()) {
613             @SuppressWarnings("unchecked")
614             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
615             return result;
616         }
617
618         // the last, unadorned future that is created
619         CompletableFuture<OperationOutcome> lastFuture = null;
620
621         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
622
623         // make each future
624         for (var maker : futureMakers) {
625             try {
626                 CompletableFuture<OperationOutcome> future = maker.get();
627                 if (future == null) {
628                     continue;
629                 }
630
631                 // propagate "stop" to the future
632                 controller.add(future);
633
634                 futures.add(adorn.apply(future));
635
636                 lastFuture = future;
637
638             } catch (RuntimeException e) {
639                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
640                 controller.cancel(false);
641                 throw e;
642             }
643         }
644
645         @SuppressWarnings("unchecked")
646         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
647
648         if (result.length == 1) {
649             // special case - return the unadorned future
650             result[0] = lastFuture;
651             return result;
652         }
653
654         return futures.toArray(result);
655     }
656
657     /**
658      * Combines the outcomes from a set of tasks.
659      *
660      * @param outcomes outcomes to be examined
661      * @return the combined outcome
662      */
663     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
664
665         // identify the outcome with the highest priority
666         OperationOutcome outcome = outcomes.remove();
667         int priority = detmPriority(outcome);
668
669         for (OperationOutcome outcome2 : outcomes) {
670             int priority2 = detmPriority(outcome2);
671
672             if (priority2 > priority) {
673                 outcome = outcome2;
674                 priority = priority2;
675             }
676         }
677
678         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
679                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
680
681         return outcome;
682     }
683
684     /**
685      * Determines the priority of an outcome based on its result.
686      *
687      * @param outcome outcome to examine, or {@code null}
688      * @return the outcome's priority
689      */
690     protected int detmPriority(OperationOutcome outcome) {
691         if (outcome == null || outcome.getResult() == null) {
692             return 1;
693         }
694
695         switch (outcome.getResult()) {
696             case SUCCESS:
697                 return 0;
698
699             case FAILURE_GUARD:
700                 return 2;
701
702             case FAILURE_RETRIES:
703                 return 3;
704
705             case FAILURE:
706                 return 4;
707
708             case FAILURE_TIMEOUT:
709                 return 5;
710
711             case FAILURE_EXCEPTION:
712             default:
713                 return 6;
714         }
715     }
716
717     /**
718      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
719      * not created until the previous task completes. The pipeline returns the outcome of
720      * the last task executed.
721      *
722      * @param futureMakers functions to make the futures
723      * @return a future to cancel the sequence or await the outcome
724      */
725     protected CompletableFuture<OperationOutcome> sequence(
726                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
727
728         return sequence(Arrays.asList(futureMakers));
729     }
730
731     /**
732      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
733      * not created until the previous task completes. The pipeline returns the outcome of
734      * the last task executed.
735      *
736      * @param futureMakers functions to make the futures
737      * @return a future to cancel the sequence or await the outcome, or {@code null} if
738      *         there were no tasks to perform
739      */
740     protected CompletableFuture<OperationOutcome> sequence(
741                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
742
743         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
744
745         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
746         if (nextTask == null) {
747             // no tasks
748             return null;
749         }
750
751         if (queue.isEmpty()) {
752             // only one task - just return it rather than wrapping it in a controller
753             return nextTask;
754         }
755
756         /*
757          * multiple tasks - need a controller to stop whichever task is currently
758          * executing
759          */
760         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
761         final Executor executor = params.getExecutor();
762
763         // @formatter:off
764         controller.wrap(nextTask)
765                     .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
766                     .whenCompleteAsync(controller.delayedComplete(), executor);
767         // @formatter:on
768
769         return controller;
770     }
771
772     /**
773      * Executes the next task in the queue, if the previous outcome was successful.
774      *
775      * @param controller pipeline controller
776      * @param taskQueue queue of tasks to be performed
777      * @return a future to execute the remaining tasks, or the current outcome, if it's a
778      *         failure, or if there are no more tasks
779      */
780     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
781                     PipelineControllerFuture<OperationOutcome> controller,
782                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
783
784         return outcome -> {
785             if (!isSuccess(outcome)) {
786                 // return the failure
787                 return CompletableFuture.completedFuture(outcome);
788             }
789
790             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
791             if (nextTask == null) {
792                 // no tasks - just return the success
793                 return CompletableFuture.completedFuture(outcome);
794             }
795
796             // @formatter:off
797             return controller
798                         .wrap(nextTask)
799                         .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
800             // @formatter:on
801         };
802     }
803
804     /**
805      * Gets the next task from the queue, skipping those that are {@code null}.
806      *
807      * @param taskQueue task queue
808      * @return the next task, or {@code null} if the queue is now empty
809      */
810     private CompletableFuture<OperationOutcome> getNextTask(
811                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
812
813         Supplier<CompletableFuture<OperationOutcome>> maker;
814
815         while ((maker = taskQueue.poll()) != null) {
816             CompletableFuture<OperationOutcome> future = maker.get();
817             if (future != null) {
818                 return future;
819             }
820         }
821
822         return null;
823     }
824
825     /**
826      * Sets the start time of the operation and invokes the callback to indicate that the
827      * operation has started. Does nothing if the pipeline has been stopped.
828      * <p/>
829      * This assumes that the "outcome" is not {@code null}.
830      *
831      * @param callbacks used to determine if the start callback can be invoked
832      * @return a function that sets the start time and invokes the callback
833      */
834     private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
835
836         return (outcome, thrown) -> {
837
838             if (callbacks.canStart()) {
839                 // haven't invoked "start" callback yet
840                 outcome.setStart(callbacks.getStartTime());
841                 outcome.setEnd(null);
842                 params.callbackStarted(outcome);
843             }
844         };
845     }
846
847     /**
848      * Sets the end time of the operation and invokes the callback to indicate that the
849      * operation has completed. Does nothing if the pipeline has been stopped.
850      * <p/>
851      * This assumes that the "outcome" is not {@code null}.
852      * <p/>
853      * Note: the start time must be a reference rather than a plain value, because it's
854      * value must be gotten on-demand, when the returned function is executed at a later
855      * time.
856      *
857      * @param callbacks used to determine if the end callback can be invoked
858      * @return a function that sets the end time and invokes the callback
859      */
860     private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
861
862         return (outcome, thrown) -> {
863
864             if (callbacks.canEnd()) {
865                 outcome.setStart(callbacks.getStartTime());
866                 outcome.setEnd(callbacks.getEndTime());
867                 params.callbackCompleted(outcome);
868             }
869         };
870     }
871
872     /**
873      * Sets an operation's outcome and message, based on a throwable.
874      *
875      * @param operation operation to be updated
876      * @return the updated operation
877      */
878     protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
879         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
880         return setOutcome(operation, result);
881     }
882
883     /**
884      * Sets an operation's outcome and default message based on the result.
885      *
886      * @param operation operation to be updated
887      * @param result result of the operation
888      * @return the updated operation
889      */
890     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
891         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
892         operation.setResult(result);
893         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
894                         : ControlLoopOperation.FAILED_MSG);
895
896         return operation;
897     }
898
899     /**
900      * Determines if a throwable is due to a timeout.
901      *
902      * @param thrown throwable of interest
903      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
904      */
905     protected boolean isTimeout(Throwable thrown) {
906         if (thrown instanceof CompletionException) {
907             thrown = thrown.getCause();
908         }
909
910         return (thrown instanceof TimeoutException);
911     }
912
913     /**
914      * Logs a response. If the response is not of type, String, then it attempts to
915      * pretty-print it into JSON before logging.
916      *
917      * @param direction IN or OUT
918      * @param infra communication infrastructure on which it was published
919      * @param source source name (e.g., the URL or Topic name)
920      * @param response response to be logged
921      * @return the JSON text that was logged
922      */
923     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
924         String json;
925         try {
926             if (response == null) {
927                 json = null;
928             } else if (response instanceof String) {
929                 json = response.toString();
930             } else {
931                 json = makeCoder().encode(response, true);
932             }
933
934         } catch (CoderException e) {
935             String type = (direction == EventType.IN ? "response" : "request");
936             logger.warn("cannot pretty-print {}", type, e);
937             json = response.toString();
938         }
939
940         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
941
942         return json;
943     }
944
945     // these may be overridden by subclasses or junit tests
946
947     /**
948      * Gets the retry count.
949      *
950      * @param retry retry, extracted from the parameters, or {@code null}
951      * @return the number of retries, or {@code 0} if no retries were specified
952      */
953     protected int getRetry(Integer retry) {
954         return (retry == null ? 0 : retry);
955     }
956
957     /**
958      * Gets the retry wait, in milliseconds.
959      *
960      * @return the retry wait, in milliseconds
961      */
962     protected long getRetryWaitMs() {
963         return DEFAULT_RETRY_WAIT_MS;
964     }
965
966     /**
967      * Gets the operation timeout.
968      *
969      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
970      *        {@code null}
971      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
972      *         specified
973      */
974     protected long getTimeoutMs(Integer timeoutSec) {
975         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
976     }
977
978     // these may be overridden by junit tests
979
980     protected Coder makeCoder() {
981         return coder;
982     }
983 }