Use BidirectionalTopicClient from policy-common
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.CompletionException;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.function.BiConsumer;
35 import java.util.function.Function;
36 import java.util.function.Supplier;
37 import java.util.function.UnaryOperator;
38 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
39 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
40 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
41 import org.onap.policy.common.utils.coder.Coder;
42 import org.onap.policy.common.utils.coder.CoderException;
43 import org.onap.policy.common.utils.coder.StandardCoder;
44 import org.onap.policy.controlloop.ControlLoopOperation;
45 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
46 import org.onap.policy.controlloop.actorserviceprovider.Operation;
47 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
48 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
49 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
50 import org.onap.policy.controlloop.policy.PolicyResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Partial implementation of an operator. In general, it's preferable that subclasses
56  * would override {@link #startOperationAsync(int, OperationOutcome)
57  * startOperationAsync()}. However, if that proves to be too difficult, then they can
58  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
59  * if the operation requires any preprocessor steps, the subclass may choose to override
60  * {@link #startPreprocessorAsync()}.
61  * <p/>
62  * The futures returned by the methods within this class can be canceled, and will
63  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
64  * returned by overridden methods will do the same. Of course, if a class overrides
65  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
66  * be done to cancel that particular operation.
67  */
68 public abstract class OperationPartial implements Operation {
69     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
70     private static final Coder coder = new StandardCoder();
71     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
72
73     // values extracted from the operator
74
75     private final OperatorPartial operator;
76
77     /**
78      * Operation parameters.
79      */
80     protected final ControlLoopOperationParams params;
81
82
83     /**
84      * Constructs the object.
85      *
86      * @param params operation parameters
87      * @param operator operator that created this operation
88      */
89     public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) {
90         this.params = params;
91         this.operator = operator;
92     }
93
94     public Executor getBlockingExecutor() {
95         return operator.getBlockingExecutor();
96     }
97
98     public String getFullName() {
99         return operator.getFullName();
100     }
101
102     public String getActorName() {
103         return operator.getActorName();
104     }
105
106     public String getName() {
107         return operator.getName();
108     }
109
110     @Override
111     public final CompletableFuture<OperationOutcome> start() {
112         if (!operator.isAlive()) {
113             throw new IllegalStateException("operation is not running: " + getFullName());
114         }
115
116         // allocate a controller for the entire operation
117         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
118
119         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
120         if (preproc == null) {
121             // no preprocessor required - just start the operation
122             return startOperationAttempt(controller, 1);
123         }
124
125         /*
126          * Do preprocessor first and then, if successful, start the operation. Note:
127          * operations create their own outcome, ignoring the outcome from any previous
128          * steps.
129          *
130          * Wrap the preprocessor to ensure "stop" is propagated to it.
131          */
132         // @formatter:off
133         controller.wrap(preproc)
134                         .exceptionally(fromException("preprocessor of operation"))
135                         .thenCompose(handlePreprocessorFailure(controller))
136                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
137                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
138         // @formatter:on
139
140         return controller;
141     }
142
143     /**
144      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
145      * invokes the call-backs, marks the controller complete, and returns an incomplete
146      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
147      * received.
148      * <p/>
149      * Assumes that no callbacks have been invoked yet.
150      *
151      * @param controller pipeline controller
152      * @return a function that checks the outcome status and continues, if successful, or
153      *         indicates a failure otherwise
154      */
155     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
156                     PipelineControllerFuture<OperationOutcome> controller) {
157
158         return outcome -> {
159
160             if (outcome != null && isSuccess(outcome)) {
161                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
162                 return CompletableFuture.completedFuture(outcome);
163             }
164
165             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
166
167             final Executor executor = params.getExecutor();
168             final CallbackManager callbacks = new CallbackManager();
169
170             // propagate "stop" to the callbacks
171             controller.add(callbacks);
172
173             final OperationOutcome outcome2 = params.makeOutcome();
174
175             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
176
177             outcome2.setResult(PolicyResult.FAILURE_GUARD);
178             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
179
180             // @formatter:off
181             CompletableFuture.completedFuture(outcome2)
182                             .whenCompleteAsync(callbackStarted(callbacks), executor)
183                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
184                             .whenCompleteAsync(controller.delayedComplete(), executor);
185             // @formatter:on
186
187             return new CompletableFuture<>();
188         };
189     }
190
191     /**
192      * Invokes the operation's preprocessor step(s) as a "future". This method simply
193      * invokes {@link #startGuardAsync()}.
194      * <p/>
195      * This method assumes the following:
196      * <ul>
197      * <li>the operator is alive</li>
198      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
199      * </ul>
200      *
201      * @return a function that will start the preprocessor and returns its outcome, or
202      *         {@code null} if this operation needs no preprocessor
203      */
204     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
205         return startGuardAsync();
206     }
207
208     /**
209      * Invokes the operation's guard step(s) as a "future". This method simply returns
210      * {@code null}.
211      * <p/>
212      * This method assumes the following:
213      * <ul>
214      * <li>the operator is alive</li>
215      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
216      * </ul>
217      *
218      * @return a function that will start the guard checks and returns its outcome, or
219      *         {@code null} if this operation has no guard
220      */
221     protected CompletableFuture<OperationOutcome> startGuardAsync() {
222         return null;
223     }
224
225     /**
226      * Starts the operation attempt, with no preprocessor. When all retries complete, it
227      * will complete the controller.
228      *
229      * @param controller controller for all operation attempts
230      * @param attempt attempt number, typically starting with 1
231      * @return a future that will return the final result of all attempts
232      */
233     private CompletableFuture<OperationOutcome> startOperationAttempt(
234                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
235
236         // propagate "stop" to the operation attempt
237         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
238                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
239
240         return controller;
241     }
242
243     /**
244      * Starts the operation attempt, without doing any retries.
245      *
246      * @param params operation parameters
247      * @param attempt attempt number, typically starting with 1
248      * @return a future that will return the result of a single operation attempt
249      */
250     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
251
252         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
253
254         final Executor executor = params.getExecutor();
255         final OperationOutcome outcome = params.makeOutcome();
256         final CallbackManager callbacks = new CallbackManager();
257
258         // this operation attempt gets its own controller
259         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
260
261         // propagate "stop" to the callbacks
262         controller.add(callbacks);
263
264         // @formatter:off
265         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
266                         .whenCompleteAsync(callbackStarted(callbacks), executor)
267                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
268         // @formatter:on
269
270         // handle timeouts, if specified
271         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
272         if (timeoutMillis > 0) {
273             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
274             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
275         }
276
277         /*
278          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
279          * before callbackCompleted() is invoked.
280          *
281          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
282          * the pipeline as the last step anyway.
283          */
284
285         // @formatter:off
286         future.exceptionally(fromException("operation"))
287                     .thenApply(setRetryFlag(attempt))
288                     .whenCompleteAsync(callbackStarted(callbacks), executor)
289                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
290                     .whenCompleteAsync(controller.delayedComplete(), executor);
291         // @formatter:on
292
293         return controller;
294     }
295
296     /**
297      * Determines if the outcome was successful.
298      *
299      * @param outcome outcome to examine
300      * @return {@code true} if the outcome was successful
301      */
302     protected boolean isSuccess(OperationOutcome outcome) {
303         return (outcome.getResult() == PolicyResult.SUCCESS);
304     }
305
306     /**
307      * Determines if the outcome was a failure for this operator.
308      *
309      * @param outcome outcome to examine, or {@code null}
310      * @return {@code true} if the outcome is not {@code null} and was a failure
311      *         <i>and</i> was associated with this operator, {@code false} otherwise
312      */
313     protected boolean isActorFailed(OperationOutcome outcome) {
314         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
315     }
316
317     /**
318      * Determines if the given outcome is for this operation.
319      *
320      * @param outcome outcome to examine
321      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
322      */
323     protected boolean isSameOperation(OperationOutcome outcome) {
324         return OperationOutcome.isFor(outcome, getActorName(), getName());
325     }
326
327     /**
328      * Invokes the operation as a "future". This method simply invokes
329      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
330      * returning the result via a "future".
331      * <p/>
332      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
333      * the executor in the "params", as that may bring the background thread pool to a
334      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
335      * instead.
336      * <p/>
337      * This method assumes the following:
338      * <ul>
339      * <li>the operator is alive</li>
340      * <li>verifyRunning() has been invoked</li>
341      * <li>callbackStarted() has been invoked</li>
342      * <li>the invoker will perform appropriate timeout checks</li>
343      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
344      * </ul>
345      *
346      * @param attempt attempt number, typically starting with 1
347      * @return a function that will start the operation and return its result when
348      *         complete
349      */
350     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
351
352         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
353     }
354
355     /**
356      * Low-level method that performs the operation. This can make the same assumptions
357      * that are made by {@link #doOperationAsFuture()}. This particular method simply
358      * throws an {@link UnsupportedOperationException}.
359      *
360      * @param attempt attempt number, typically starting with 1
361      * @param operation the operation being performed
362      * @return the outcome of the operation
363      */
364     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
365
366         throw new UnsupportedOperationException("start operation " + getFullName());
367     }
368
369     /**
370      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
371      * FAILURE, assuming the policy specifies retries and the retry count has been
372      * exhausted.
373      *
374      * @param attempt latest attempt number, starting with 1
375      * @return a function to get the next future to execute
376      */
377     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
378
379         return operation -> {
380             if (operation != null && !isActorFailed(operation)) {
381                 /*
382                  * wrong type or wrong operation - just leave it as is. No need to log
383                  * anything here, as retryOnFailure() will log a message
384                  */
385                 return operation;
386             }
387
388             // get a non-null operation
389             OperationOutcome oper2;
390             if (operation != null) {
391                 oper2 = operation;
392             } else {
393                 oper2 = params.makeOutcome();
394                 oper2.setResult(PolicyResult.FAILURE);
395             }
396
397             int retry = getRetry(params.getRetry());
398             if (retry > 0 && attempt > retry) {
399                 /*
400                  * retries were specified and we've already tried them all - change to
401                  * FAILURE_RETRIES
402                  */
403                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
404                 oper2.setResult(PolicyResult.FAILURE_RETRIES);
405             }
406
407             return oper2;
408         };
409     }
410
411     /**
412      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
413      * was previously invoked, and thus that the "operation" is not {@code null}.
414      *
415      * @param controller controller for all of the retries
416      * @param attempt latest attempt number, starting with 1
417      * @return a function to get the next future to execute
418      */
419     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
420                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
421
422         return operation -> {
423             if (!isActorFailed(operation)) {
424                 // wrong type or wrong operation - just leave it as is
425                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
426                 controller.complete(operation);
427                 return new CompletableFuture<>();
428             }
429
430             if (getRetry(params.getRetry()) <= 0) {
431                 // no retries - already marked as FAILURE, so just return it
432                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
433                 controller.complete(operation);
434                 return new CompletableFuture<>();
435             }
436
437             /*
438              * Retry the operation.
439              */
440             long waitMs = getRetryWaitMs();
441             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
442
443             return sleep(waitMs, TimeUnit.MILLISECONDS)
444                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
445         };
446     }
447
448     /**
449      * Convenience method that starts a sleep(), running via a future.
450      *
451      * @param sleepTime time to sleep
452      * @param unit time unit
453      * @return a future that will complete when the sleep completes
454      */
455     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
456         if (sleepTime <= 0) {
457             return CompletableFuture.completedFuture(null);
458         }
459
460         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
461     }
462
463     /**
464      * Converts an exception into an operation outcome, returning a copy of the outcome to
465      * prevent background jobs from changing it.
466      *
467      * @param type type of item throwing the exception
468      * @return a function that will convert an exception into an operation outcome
469      */
470     private Function<Throwable, OperationOutcome> fromException(String type) {
471
472         return thrown -> {
473             OperationOutcome outcome = params.makeOutcome();
474
475             logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
476                             params.getRequestId(), thrown);
477
478             return setOutcome(outcome, thrown);
479         };
480     }
481
482     /**
483      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
484      * any outstanding futures when one completes.
485      *
486      * @param futureMakers function to make a future. If the function returns
487      *        {@code null}, then no future is created for that function. On the other
488      *        hand, if the function throws an exception, then the previously created
489      *        functions are canceled and the exception is re-thrown
490      * @return a future to cancel or await an outcome, or {@code null} if no futures were
491      *         created. If this future is canceled, then all of the futures will be
492      *         canceled
493      */
494     protected CompletableFuture<OperationOutcome> anyOf(
495                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
496
497         return anyOf(Arrays.asList(futureMakers));
498     }
499
500     /**
501      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
502      * any outstanding futures when one completes.
503      *
504      * @param futureMakers function to make a future. If the function returns
505      *        {@code null}, then no future is created for that function. On the other
506      *        hand, if the function throws an exception, then the previously created
507      *        functions are canceled and the exception is re-thrown
508      * @return a future to cancel or await an outcome, or {@code null} if no futures were
509      *         created. If this future is canceled, then all of the futures will be
510      *         canceled. Similarly, when this future completes, any incomplete futures
511      *         will be canceled
512      */
513     protected CompletableFuture<OperationOutcome> anyOf(
514                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
515
516         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
517
518         CompletableFuture<OperationOutcome>[] futures =
519                         attachFutures(controller, futureMakers, UnaryOperator.identity());
520
521         if (futures.length == 0) {
522             // no futures were started
523             return null;
524         }
525
526         if (futures.length == 1) {
527             return futures[0];
528         }
529
530         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
531                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
532
533         return controller;
534     }
535
536     /**
537      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
538      *
539      * @param futureMakers function to make a future. If the function returns
540      *        {@code null}, then no future is created for that function. On the other
541      *        hand, if the function throws an exception, then the previously created
542      *        functions are canceled and the exception is re-thrown
543      * @return a future to cancel or await an outcome, or {@code null} if no futures were
544      *         created. If this future is canceled, then all of the futures will be
545      *         canceled
546      */
547     protected CompletableFuture<OperationOutcome> allOf(
548                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
549
550         return allOf(Arrays.asList(futureMakers));
551     }
552
553     /**
554      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
555      *
556      * @param futureMakers function to make a future. If the function returns
557      *        {@code null}, then no future is created for that function. On the other
558      *        hand, if the function throws an exception, then the previously created
559      *        functions are canceled and the exception is re-thrown
560      * @return a future to cancel or await an outcome, or {@code null} if no futures were
561      *         created. If this future is canceled, then all of the futures will be
562      *         canceled. Similarly, when this future completes, any incomplete futures
563      *         will be canceled
564      */
565     protected CompletableFuture<OperationOutcome> allOf(
566                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
567         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
568
569         Queue<OperationOutcome> outcomes = new LinkedList<>();
570
571         CompletableFuture<OperationOutcome>[] futures =
572                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
573                             synchronized (outcomes) {
574                                 outcomes.add(outcome);
575                             }
576                             return outcome;
577                         }));
578
579         if (futures.length == 0) {
580             // no futures were started
581             return null;
582         }
583
584         if (futures.length == 1) {
585             return futures[0];
586         }
587
588         // @formatter:off
589         CompletableFuture.allOf(futures)
590                         .thenApply(unused -> combineOutcomes(outcomes))
591                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
592         // @formatter:on
593
594         return controller;
595     }
596
597     /**
598      * Invokes the functions to create the futures and attaches them to the controller.
599      *
600      * @param controller master controller for all of the futures
601      * @param futureMakers futures to be attached to the controller
602      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
603      *        Returns the adorned future
604      * @return an array of futures, possibly zero-length. If the array is of size one,
605      *         then that one item should be returned instead of the controller
606      */
607     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
608                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
609                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
610
611         if (futureMakers.isEmpty()) {
612             @SuppressWarnings("unchecked")
613             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
614             return result;
615         }
616
617         // the last, unadorned future that is created
618         CompletableFuture<OperationOutcome> lastFuture = null;
619
620         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
621
622         // make each future
623         for (var maker : futureMakers) {
624             try {
625                 CompletableFuture<OperationOutcome> future = maker.get();
626                 if (future == null) {
627                     continue;
628                 }
629
630                 // propagate "stop" to the future
631                 controller.add(future);
632
633                 futures.add(adorn.apply(future));
634
635                 lastFuture = future;
636
637             } catch (RuntimeException e) {
638                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
639                 controller.cancel(false);
640                 throw e;
641             }
642         }
643
644         @SuppressWarnings("unchecked")
645         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
646
647         if (result.length == 1) {
648             // special case - return the unadorned future
649             result[0] = lastFuture;
650             return result;
651         }
652
653         return futures.toArray(result);
654     }
655
656     /**
657      * Combines the outcomes from a set of tasks.
658      *
659      * @param outcomes outcomes to be examined
660      * @return the combined outcome
661      */
662     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
663
664         // identify the outcome with the highest priority
665         OperationOutcome outcome = outcomes.remove();
666         int priority = detmPriority(outcome);
667
668         for (OperationOutcome outcome2 : outcomes) {
669             int priority2 = detmPriority(outcome2);
670
671             if (priority2 > priority) {
672                 outcome = outcome2;
673                 priority = priority2;
674             }
675         }
676
677         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
678                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
679
680         return outcome;
681     }
682
683     /**
684      * Determines the priority of an outcome based on its result.
685      *
686      * @param outcome outcome to examine, or {@code null}
687      * @return the outcome's priority
688      */
689     protected int detmPriority(OperationOutcome outcome) {
690         if (outcome == null || outcome.getResult() == null) {
691             return 1;
692         }
693
694         switch (outcome.getResult()) {
695             case SUCCESS:
696                 return 0;
697
698             case FAILURE_GUARD:
699                 return 2;
700
701             case FAILURE_RETRIES:
702                 return 3;
703
704             case FAILURE:
705                 return 4;
706
707             case FAILURE_TIMEOUT:
708                 return 5;
709
710             case FAILURE_EXCEPTION:
711             default:
712                 return 6;
713         }
714     }
715
716     /**
717      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
718      * not created until the previous task completes. The pipeline returns the outcome of
719      * the last task executed.
720      *
721      * @param futureMakers functions to make the futures
722      * @return a future to cancel the sequence or await the outcome
723      */
724     protected CompletableFuture<OperationOutcome> sequence(
725                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
726
727         return sequence(Arrays.asList(futureMakers));
728     }
729
730     /**
731      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
732      * not created until the previous task completes. The pipeline returns the outcome of
733      * the last task executed.
734      *
735      * @param futureMakers functions to make the futures
736      * @return a future to cancel the sequence or await the outcome, or {@code null} if
737      *         there were no tasks to perform
738      */
739     protected CompletableFuture<OperationOutcome> sequence(
740                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
741
742         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
743
744         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
745         if (nextTask == null) {
746             // no tasks
747             return null;
748         }
749
750         if (queue.isEmpty()) {
751             // only one task - just return it rather than wrapping it in a controller
752             return nextTask;
753         }
754
755         /*
756          * multiple tasks - need a controller to stop whichever task is currently
757          * executing
758          */
759         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
760         final Executor executor = params.getExecutor();
761
762         // @formatter:off
763         controller.wrap(nextTask)
764                     .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
765                     .whenCompleteAsync(controller.delayedComplete(), executor);
766         // @formatter:on
767
768         return controller;
769     }
770
771     /**
772      * Executes the next task in the queue, if the previous outcome was successful.
773      *
774      * @param controller pipeline controller
775      * @param taskQueue queue of tasks to be performed
776      * @return a future to execute the remaining tasks, or the current outcome, if it's a
777      *         failure, or if there are no more tasks
778      */
779     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
780                     PipelineControllerFuture<OperationOutcome> controller,
781                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
782
783         return outcome -> {
784             if (!isSuccess(outcome)) {
785                 // return the failure
786                 return CompletableFuture.completedFuture(outcome);
787             }
788
789             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
790             if (nextTask == null) {
791                 // no tasks - just return the success
792                 return CompletableFuture.completedFuture(outcome);
793             }
794
795             // @formatter:off
796             return controller
797                         .wrap(nextTask)
798                         .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
799             // @formatter:on
800         };
801     }
802
803     /**
804      * Gets the next task from the queue, skipping those that are {@code null}.
805      *
806      * @param taskQueue task queue
807      * @return the next task, or {@code null} if the queue is now empty
808      */
809     private CompletableFuture<OperationOutcome> getNextTask(
810                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
811
812         Supplier<CompletableFuture<OperationOutcome>> maker;
813
814         while ((maker = taskQueue.poll()) != null) {
815             CompletableFuture<OperationOutcome> future = maker.get();
816             if (future != null) {
817                 return future;
818             }
819         }
820
821         return null;
822     }
823
824     /**
825      * Sets the start time of the operation and invokes the callback to indicate that the
826      * operation has started. Does nothing if the pipeline has been stopped.
827      * <p/>
828      * This assumes that the "outcome" is not {@code null}.
829      *
830      * @param callbacks used to determine if the start callback can be invoked
831      * @return a function that sets the start time and invokes the callback
832      */
833     private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
834
835         return (outcome, thrown) -> {
836
837             if (callbacks.canStart()) {
838                 // haven't invoked "start" callback yet
839                 outcome.setStart(callbacks.getStartTime());
840                 outcome.setEnd(null);
841                 params.callbackStarted(outcome);
842             }
843         };
844     }
845
846     /**
847      * Sets the end time of the operation and invokes the callback to indicate that the
848      * operation has completed. Does nothing if the pipeline has been stopped.
849      * <p/>
850      * This assumes that the "outcome" is not {@code null}.
851      * <p/>
852      * Note: the start time must be a reference rather than a plain value, because it's
853      * value must be gotten on-demand, when the returned function is executed at a later
854      * time.
855      *
856      * @param callbacks used to determine if the end callback can be invoked
857      * @return a function that sets the end time and invokes the callback
858      */
859     private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
860
861         return (outcome, thrown) -> {
862
863             if (callbacks.canEnd()) {
864                 outcome.setStart(callbacks.getStartTime());
865                 outcome.setEnd(callbacks.getEndTime());
866                 params.callbackCompleted(outcome);
867             }
868         };
869     }
870
871     /**
872      * Sets an operation's outcome and message, based on a throwable.
873      *
874      * @param operation operation to be updated
875      * @return the updated operation
876      */
877     protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
878         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
879         return setOutcome(operation, result);
880     }
881
882     /**
883      * Sets an operation's outcome and default message based on the result.
884      *
885      * @param operation operation to be updated
886      * @param result result of the operation
887      * @return the updated operation
888      */
889     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
890         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
891         operation.setResult(result);
892         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
893                         : ControlLoopOperation.FAILED_MSG);
894
895         return operation;
896     }
897
898     /**
899      * Determines if a throwable is due to a timeout.
900      *
901      * @param thrown throwable of interest
902      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
903      */
904     protected boolean isTimeout(Throwable thrown) {
905         if (thrown instanceof CompletionException) {
906             thrown = thrown.getCause();
907         }
908
909         return (thrown instanceof TimeoutException);
910     }
911
912     /**
913      * Logs a response. If the response is not of type, String, then it attempts to
914      * pretty-print it into JSON before logging.
915      *
916      * @param direction IN or OUT
917      * @param infra communication infrastructure on which it was published
918      * @param source source name (e.g., the URL or Topic name)
919      * @param response response to be logged
920      * @return the JSON text that was logged
921      */
922     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
923         String json;
924         try {
925             if (response == null) {
926                 json = null;
927             } else if (response instanceof String) {
928                 json = response.toString();
929             } else {
930                 json = makeCoder().encode(response, true);
931             }
932
933         } catch (CoderException e) {
934             String type = (direction == EventType.IN ? "response" : "request");
935             logger.warn("cannot pretty-print {}", type, e);
936             json = response.toString();
937         }
938
939         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
940
941         return json;
942     }
943
944     // these may be overridden by subclasses or junit tests
945
946     /**
947      * Gets the retry count.
948      *
949      * @param retry retry, extracted from the parameters, or {@code null}
950      * @return the number of retries, or {@code 0} if no retries were specified
951      */
952     protected int getRetry(Integer retry) {
953         return (retry == null ? 0 : retry);
954     }
955
956     /**
957      * Gets the retry wait, in milliseconds.
958      *
959      * @return the retry wait, in milliseconds
960      */
961     protected long getRetryWaitMs() {
962         return DEFAULT_RETRY_WAIT_MS;
963     }
964
965     /**
966      * Gets the operation timeout.
967      *
968      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
969      *        {@code null}
970      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
971      *         specified
972      */
973     protected long getTimeoutMs(Integer timeoutSec) {
974         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
975     }
976
977     // these may be overridden by junit tests
978
979     protected Coder makeCoder() {
980         return coder;
981     }
982 }