Merge "Add version to compliant operational policies"
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedHashMap;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionException;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import java.util.function.BiConsumer;
38 import java.util.function.Function;
39 import java.util.function.Supplier;
40 import java.util.function.UnaryOperator;
41 import lombok.Getter;
42 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
43 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
44 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
45 import org.onap.policy.common.utils.coder.Coder;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.common.utils.coder.StandardCoder;
48 import org.onap.policy.controlloop.ControlLoopOperation;
49 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
50 import org.onap.policy.controlloop.actorserviceprovider.Operation;
51 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
52 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
53 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
54 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
55 import org.onap.policy.controlloop.policy.PolicyResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 /**
60  * Partial implementation of an operator. In general, it's preferable that subclasses
61  * would override {@link #startOperationAsync(int, OperationOutcome)
62  * startOperationAsync()}. However, if that proves to be too difficult, then they can
63  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
64  * if the operation requires any preprocessor steps, the subclass may choose to override
65  * {@link #startPreprocessorAsync()}.
66  * <p/>
67  * The futures returned by the methods within this class can be canceled, and will
68  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
69  * returned by overridden methods will do the same. Of course, if a class overrides
70  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
71  * be done to cancel that particular operation.
72  * <p/>
73  * In general tasks in a pipeline are executed by the same thread. However, the following
74  * should always be executed via the executor specified in "params":
75  * <ul>
76  * <li>start callback</li>
77  * <li>completion callback</li>
78  * <li>controller completion (i.e., delayedComplete())</li>
79  * </ul>
80  */
81 public abstract class OperationPartial implements Operation {
82     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
83     private static final Coder coder = new StandardCoder();
84
85     public static final String GUARD_ACTOR_NAME = "GUARD";
86     public static final String GUARD_OPERATION_NAME = "Decision";
87     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
88
89     private final OperatorConfig config;
90
91     /**
92      * Operation parameters.
93      */
94     protected final ControlLoopOperationParams params;
95
96     @Getter
97     private final String fullName;
98
99
100     /**
101      * Constructs the object.
102      *
103      * @param params operation parameters
104      * @param config configuration for this operation
105      */
106     public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
107         this.params = params;
108         this.config = config;
109         this.fullName = params.getActor() + "." + params.getOperation();
110     }
111
112     public Executor getBlockingExecutor() {
113         return config.getBlockingExecutor();
114     }
115
116     public String getActorName() {
117         return params.getActor();
118     }
119
120     public String getName() {
121         return params.getOperation();
122     }
123
124     @Override
125     public CompletableFuture<OperationOutcome> start() {
126         // allocate a controller for the entire operation
127         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
128
129         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
130         if (preproc == null) {
131             // no preprocessor required - just start the operation
132             return startOperationAttempt(controller, 1);
133         }
134
135         /*
136          * Do preprocessor first and then, if successful, start the operation. Note:
137          * operations create their own outcome, ignoring the outcome from any previous
138          * steps.
139          *
140          * Wrap the preprocessor to ensure "stop" is propagated to it.
141          */
142         // @formatter:off
143         controller.wrap(preproc)
144                         .exceptionally(fromException("preprocessor of operation"))
145                         .thenCompose(handlePreprocessorFailure(controller))
146                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
147                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
148         // @formatter:on
149
150         return controller;
151     }
152
153     /**
154      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
155      * invokes the call-backs, marks the controller complete, and returns an incomplete
156      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
157      * received.
158      * <p/>
159      * Assumes that no callbacks have been invoked yet.
160      *
161      * @param controller pipeline controller
162      * @return a function that checks the outcome status and continues, if successful, or
163      *         indicates a failure otherwise
164      */
165     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
166                     PipelineControllerFuture<OperationOutcome> controller) {
167
168         return outcome -> {
169
170             if (isSuccess(outcome)) {
171                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
172                 return CompletableFuture.completedFuture(outcome);
173             }
174
175             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
176
177             final Executor executor = params.getExecutor();
178             final CallbackManager callbacks = new CallbackManager();
179
180             // propagate "stop" to the callbacks
181             controller.add(callbacks);
182
183             final OperationOutcome outcome2 = params.makeOutcome();
184
185             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
186
187             outcome2.setFinalOutcome(true);
188             outcome2.setResult(PolicyResult.FAILURE_GUARD);
189             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
190
191             // @formatter:off
192             CompletableFuture.completedFuture(outcome2)
193                             .whenCompleteAsync(callbackStarted(callbacks), executor)
194                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
195                             .whenCompleteAsync(controller.delayedComplete(), executor);
196             // @formatter:on
197
198             return new CompletableFuture<>();
199         };
200     }
201
202     /**
203      * Invokes the operation's preprocessor step(s) as a "future". This method simply
204      * returns {@code null}.
205      * <p/>
206      * This method assumes the following:
207      * <ul>
208      * <li>the operator is alive</li>
209      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
210      * </ul>
211      *
212      * @return a function that will start the preprocessor and returns its outcome, or
213      *         {@code null} if this operation needs no preprocessor
214      */
215     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
216         return null;
217     }
218
219     /**
220      * Invokes the operation's guard step(s) as a "future".
221      * <p/>
222      * This method assumes the following:
223      * <ul>
224      * <li>the operator is alive</li>
225      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
226      * </ul>
227      *
228      * @return a function that will start the guard checks and returns its outcome, or
229      *         {@code null} if this operation has no guard
230      */
231     protected CompletableFuture<OperationOutcome> startGuardAsync() {
232         // get the guard payload
233         Map<String, Object> payload = makeGuardPayload();
234
235         /*
236          * Note: can't use constants from actor.guard, because that would create a
237          * circular dependency.
238          */
239         return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
240                         .payload(payload).build().start();
241     }
242
243     /**
244      * Creates a payload to execute a guard operation.
245      *
246      * @return a new guard payload
247      */
248     protected Map<String, Object> makeGuardPayload() {
249         Map<String, Object> guard = new LinkedHashMap<>();
250         guard.put("actor", params.getActor());
251         guard.put("operation", params.getOperation());
252         guard.put("target", params.getTargetEntity());
253         guard.put("requestId", params.getRequestId());
254
255         String clname = params.getContext().getEvent().getClosedLoopControlName();
256         if (clname != null) {
257             guard.put("clname", clname);
258         }
259
260         return guard;
261     }
262
263     /**
264      * Starts the operation attempt, with no preprocessor. When all retries complete, it
265      * will complete the controller.
266      *
267      * @param controller controller for all operation attempts
268      * @param attempt attempt number, typically starting with 1
269      * @return a future that will return the final result of all attempts
270      */
271     private CompletableFuture<OperationOutcome> startOperationAttempt(
272                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
273
274         // propagate "stop" to the operation attempt
275         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
276                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
277
278         return controller;
279     }
280
281     /**
282      * Starts the operation attempt, without doing any retries.
283      *
284      * @param params operation parameters
285      * @param attempt attempt number, typically starting with 1
286      * @return a future that will return the result of a single operation attempt
287      */
288     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
289
290         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
291
292         final Executor executor = params.getExecutor();
293         final OperationOutcome outcome = params.makeOutcome();
294         final CallbackManager callbacks = new CallbackManager();
295
296         // this operation attempt gets its own controller
297         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
298
299         // propagate "stop" to the callbacks
300         controller.add(callbacks);
301
302         // @formatter:off
303         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
304                         .whenCompleteAsync(callbackStarted(callbacks), executor)
305                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
306         // @formatter:on
307
308         // handle timeouts, if specified
309         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
310         if (timeoutMillis > 0) {
311             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
312             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
313         }
314
315         /*
316          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
317          * before callbackCompleted() is invoked.
318          *
319          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
320          * the pipeline as the last step anyway.
321          */
322
323         // @formatter:off
324         future.exceptionally(fromException("operation"))
325                     .thenApply(setRetryFlag(attempt))
326                     .whenCompleteAsync(callbackStarted(callbacks), executor)
327                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
328                     .whenCompleteAsync(controller.delayedComplete(), executor);
329         // @formatter:on
330
331         return controller;
332     }
333
334     /**
335      * Determines if the outcome was successful.
336      *
337      * @param outcome outcome to examine
338      * @return {@code true} if the outcome was successful
339      */
340     protected boolean isSuccess(OperationOutcome outcome) {
341         return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
342     }
343
344     /**
345      * Determines if the outcome was a failure for this operator.
346      *
347      * @param outcome outcome to examine, or {@code null}
348      * @return {@code true} if the outcome is not {@code null} and was a failure
349      *         <i>and</i> was associated with this operator, {@code false} otherwise
350      */
351     protected boolean isActorFailed(OperationOutcome outcome) {
352         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
353     }
354
355     /**
356      * Determines if the given outcome is for this operation.
357      *
358      * @param outcome outcome to examine
359      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
360      */
361     protected boolean isSameOperation(OperationOutcome outcome) {
362         return OperationOutcome.isFor(outcome, getActorName(), getName());
363     }
364
365     /**
366      * Invokes the operation as a "future". This method simply invokes
367      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
368      * returning the result via a "future".
369      * <p/>
370      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
371      * the executor in the "params", as that may bring the background thread pool to a
372      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
373      * instead.
374      * <p/>
375      * This method assumes the following:
376      * <ul>
377      * <li>the operator is alive</li>
378      * <li>verifyRunning() has been invoked</li>
379      * <li>callbackStarted() has been invoked</li>
380      * <li>the invoker will perform appropriate timeout checks</li>
381      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
382      * </ul>
383      *
384      * @param attempt attempt number, typically starting with 1
385      * @return a function that will start the operation and return its result when
386      *         complete
387      */
388     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
389
390         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
391     }
392
393     /**
394      * Low-level method that performs the operation. This can make the same assumptions
395      * that are made by {@link #doOperationAsFuture()}. This particular method simply
396      * throws an {@link UnsupportedOperationException}.
397      *
398      * @param attempt attempt number, typically starting with 1
399      * @param operation the operation being performed
400      * @return the outcome of the operation
401      */
402     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
403
404         throw new UnsupportedOperationException("start operation " + getFullName());
405     }
406
407     /**
408      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
409      * FAILURE, assuming the policy specifies retries and the retry count has been
410      * exhausted.
411      *
412      * @param attempt latest attempt number, starting with 1
413      * @return a function to get the next future to execute
414      */
415     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
416
417         return origOutcome -> {
418             // ensure we have a non-null outcome
419             OperationOutcome outcome;
420             if (origOutcome != null) {
421                 outcome = origOutcome;
422             } else {
423                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
424                 outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
425             }
426
427             // ensure correct actor/operation
428             outcome.setActor(getActorName());
429             outcome.setOperation(getName());
430
431             // determine if we should retry, based on the result
432             if (outcome.getResult() != PolicyResult.FAILURE) {
433                 // do not retry success or other failure types (e.g., exception)
434                 outcome.setFinalOutcome(true);
435                 return outcome;
436             }
437
438             int retry = getRetry(params.getRetry());
439             if (retry <= 0) {
440                 // no retries were specified
441                 outcome.setFinalOutcome(true);
442
443             } else if (attempt <= retry) {
444                 // have more retries - not the final outcome
445                 outcome.setFinalOutcome(false);
446
447             } else {
448                 /*
449                  * retries were specified and we've already tried them all - change to
450                  * FAILURE_RETRIES
451                  */
452                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
453                 outcome.setResult(PolicyResult.FAILURE_RETRIES);
454                 outcome.setFinalOutcome(true);
455             }
456
457             return outcome;
458         };
459     }
460
461     /**
462      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
463      * was previously invoked, and thus that the "operation" is not {@code null}.
464      *
465      * @param controller controller for all of the retries
466      * @param attempt latest attempt number, starting with 1
467      * @return a function to get the next future to execute
468      */
469     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
470                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
471
472         return operation -> {
473             if (!isActorFailed(operation)) {
474                 // wrong type or wrong operation - just leave it as is
475                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
476                 controller.complete(operation);
477                 return new CompletableFuture<>();
478             }
479
480             if (getRetry(params.getRetry()) <= 0) {
481                 // no retries - already marked as FAILURE, so just return it
482                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
483                 controller.complete(operation);
484                 return new CompletableFuture<>();
485             }
486
487             /*
488              * Retry the operation.
489              */
490             long waitMs = getRetryWaitMs();
491             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
492
493             return sleep(waitMs, TimeUnit.MILLISECONDS)
494                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
495         };
496     }
497
498     /**
499      * Convenience method that starts a sleep(), running via a future.
500      *
501      * @param sleepTime time to sleep
502      * @param unit time unit
503      * @return a future that will complete when the sleep completes
504      */
505     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
506         if (sleepTime <= 0) {
507             return CompletableFuture.completedFuture(null);
508         }
509
510         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
511     }
512
513     /**
514      * Converts an exception into an operation outcome, returning a copy of the outcome to
515      * prevent background jobs from changing it.
516      *
517      * @param type type of item throwing the exception
518      * @return a function that will convert an exception into an operation outcome
519      */
520     private Function<Throwable, OperationOutcome> fromException(String type) {
521
522         return thrown -> {
523             OperationOutcome outcome = params.makeOutcome();
524
525             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
526                 // do not include exception in the message, as it just clutters the log
527                 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
528                                 params.getRequestId());
529             } else {
530                 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
531                                 params.getRequestId(), thrown);
532             }
533
534             return setOutcome(outcome, thrown);
535         };
536     }
537
538     /**
539      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
540      * any outstanding futures when one completes.
541      *
542      * @param futureMakers function to make a future. If the function returns
543      *        {@code null}, then no future is created for that function. On the other
544      *        hand, if the function throws an exception, then the previously created
545      *        functions are canceled and the exception is re-thrown
546      * @return a future to cancel or await an outcome, or {@code null} if no futures were
547      *         created. If this future is canceled, then all of the futures will be
548      *         canceled
549      */
550     public CompletableFuture<OperationOutcome> anyOf(
551                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
552
553         return anyOf(Arrays.asList(futureMakers));
554     }
555
556     /**
557      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
558      * any outstanding futures when one completes.
559      *
560      * @param futureMakers function to make a future. If the function returns
561      *        {@code null}, then no future is created for that function. On the other
562      *        hand, if the function throws an exception, then the previously created
563      *        functions are canceled and the exception is re-thrown
564      * @return a future to cancel or await an outcome, or {@code null} if no futures were
565      *         created. If this future is canceled, then all of the futures will be
566      *         canceled. Similarly, when this future completes, any incomplete futures
567      *         will be canceled
568      */
569     public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
570
571         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
572
573         CompletableFuture<OperationOutcome>[] futures =
574                         attachFutures(controller, futureMakers, UnaryOperator.identity());
575
576         if (futures.length == 0) {
577             // no futures were started
578             return null;
579         }
580
581         if (futures.length == 1) {
582             return futures[0];
583         }
584
585         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
586                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
587
588         return controller;
589     }
590
591     /**
592      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
593      *
594      * @param futureMakers function to make a future. If the function returns
595      *        {@code null}, then no future is created for that function. On the other
596      *        hand, if the function throws an exception, then the previously created
597      *        functions are canceled and the exception is re-thrown
598      * @return a future to cancel or await an outcome, or {@code null} if no futures were
599      *         created. If this future is canceled, then all of the futures will be
600      *         canceled
601      */
602     public CompletableFuture<OperationOutcome> allOf(
603                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
604
605         return allOf(Arrays.asList(futureMakers));
606     }
607
608     /**
609      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
610      *
611      * @param futureMakers function to make a future. If the function returns
612      *        {@code null}, then no future is created for that function. On the other
613      *        hand, if the function throws an exception, then the previously created
614      *        functions are canceled and the exception is re-thrown
615      * @return a future to cancel or await an outcome, or {@code null} if no futures were
616      *         created. If this future is canceled, then all of the futures will be
617      *         canceled. Similarly, when this future completes, any incomplete futures
618      *         will be canceled
619      */
620     public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
621         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
622
623         Queue<OperationOutcome> outcomes = new LinkedList<>();
624
625         CompletableFuture<OperationOutcome>[] futures =
626                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
627                             synchronized (outcomes) {
628                                 outcomes.add(outcome);
629                             }
630                             return outcome;
631                         }));
632
633         if (futures.length == 0) {
634             // no futures were started
635             return null;
636         }
637
638         if (futures.length == 1) {
639             return futures[0];
640         }
641
642         // @formatter:off
643         CompletableFuture.allOf(futures)
644                         .thenApply(unused -> combineOutcomes(outcomes))
645                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
646         // @formatter:on
647
648         return controller;
649     }
650
651     /**
652      * Invokes the functions to create the futures and attaches them to the controller.
653      *
654      * @param controller master controller for all of the futures
655      * @param futureMakers futures to be attached to the controller
656      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
657      *        Returns the adorned future
658      * @return an array of futures, possibly zero-length. If the array is of size one,
659      *         then that one item should be returned instead of the controller
660      */
661     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
662                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
663                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
664
665         if (futureMakers.isEmpty()) {
666             @SuppressWarnings("unchecked")
667             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
668             return result;
669         }
670
671         // the last, unadorned future that is created
672         CompletableFuture<OperationOutcome> lastFuture = null;
673
674         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
675
676         // make each future
677         for (var maker : futureMakers) {
678             try {
679                 CompletableFuture<OperationOutcome> future = maker.get();
680                 if (future == null) {
681                     continue;
682                 }
683
684                 // propagate "stop" to the future
685                 controller.add(future);
686
687                 futures.add(adorn.apply(future));
688
689                 lastFuture = future;
690
691             } catch (RuntimeException e) {
692                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
693                 controller.cancel(false);
694                 throw e;
695             }
696         }
697
698         @SuppressWarnings("unchecked")
699         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
700
701         if (result.length == 1) {
702             // special case - return the unadorned future
703             result[0] = lastFuture;
704             return result;
705         }
706
707         return futures.toArray(result);
708     }
709
710     /**
711      * Combines the outcomes from a set of tasks.
712      *
713      * @param outcomes outcomes to be examined
714      * @return the combined outcome
715      */
716     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
717
718         // identify the outcome with the highest priority
719         OperationOutcome outcome = outcomes.remove();
720         int priority = detmPriority(outcome);
721
722         for (OperationOutcome outcome2 : outcomes) {
723             int priority2 = detmPriority(outcome2);
724
725             if (priority2 > priority) {
726                 outcome = outcome2;
727                 priority = priority2;
728             }
729         }
730
731         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
732                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
733
734         return outcome;
735     }
736
737     /**
738      * Determines the priority of an outcome based on its result.
739      *
740      * @param outcome outcome to examine, or {@code null}
741      * @return the outcome's priority
742      */
743     protected int detmPriority(OperationOutcome outcome) {
744         if (outcome == null || outcome.getResult() == null) {
745             return 1;
746         }
747
748         switch (outcome.getResult()) {
749             case SUCCESS:
750                 return 0;
751
752             case FAILURE_GUARD:
753                 return 2;
754
755             case FAILURE_RETRIES:
756                 return 3;
757
758             case FAILURE:
759                 return 4;
760
761             case FAILURE_TIMEOUT:
762                 return 5;
763
764             case FAILURE_EXCEPTION:
765             default:
766                 return 6;
767         }
768     }
769
770     /**
771      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
772      * not created until the previous task completes. The pipeline returns the outcome of
773      * the last task executed.
774      *
775      * @param futureMakers functions to make the futures
776      * @return a future to cancel the sequence or await the outcome
777      */
778     public CompletableFuture<OperationOutcome> sequence(
779                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
780
781         return sequence(Arrays.asList(futureMakers));
782     }
783
784     /**
785      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
786      * not created until the previous task completes. The pipeline returns the outcome of
787      * the last task executed.
788      *
789      * @param futureMakers functions to make the futures
790      * @return a future to cancel the sequence or await the outcome, or {@code null} if
791      *         there were no tasks to perform
792      */
793     public CompletableFuture<OperationOutcome> sequence(
794                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
795
796         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
797
798         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
799         if (nextTask == null) {
800             // no tasks
801             return null;
802         }
803
804         if (queue.isEmpty()) {
805             // only one task - just return it rather than wrapping it in a controller
806             return nextTask;
807         }
808
809         /*
810          * multiple tasks - need a controller to stop whichever task is currently
811          * executing
812          */
813         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
814         final Executor executor = params.getExecutor();
815
816         // @formatter:off
817         controller.wrap(nextTask)
818                     .thenCompose(nextTaskOnSuccess(controller, queue))
819                     .whenCompleteAsync(controller.delayedComplete(), executor);
820         // @formatter:on
821
822         return controller;
823     }
824
825     /**
826      * Executes the next task in the queue, if the previous outcome was successful.
827      *
828      * @param controller pipeline controller
829      * @param taskQueue queue of tasks to be performed
830      * @return a future to execute the remaining tasks, or the current outcome, if it's a
831      *         failure, or if there are no more tasks
832      */
833     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
834                     PipelineControllerFuture<OperationOutcome> controller,
835                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
836
837         return outcome -> {
838             if (!isSuccess(outcome)) {
839                 // return the failure
840                 return CompletableFuture.completedFuture(outcome);
841             }
842
843             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
844             if (nextTask == null) {
845                 // no tasks - just return the success
846                 return CompletableFuture.completedFuture(outcome);
847             }
848
849             // @formatter:off
850             return controller
851                         .wrap(nextTask)
852                         .thenCompose(nextTaskOnSuccess(controller, taskQueue));
853             // @formatter:on
854         };
855     }
856
857     /**
858      * Gets the next task from the queue, skipping those that are {@code null}.
859      *
860      * @param taskQueue task queue
861      * @return the next task, or {@code null} if the queue is now empty
862      */
863     private CompletableFuture<OperationOutcome> getNextTask(
864                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
865
866         Supplier<CompletableFuture<OperationOutcome>> maker;
867
868         while ((maker = taskQueue.poll()) != null) {
869             CompletableFuture<OperationOutcome> future = maker.get();
870             if (future != null) {
871                 return future;
872             }
873         }
874
875         return null;
876     }
877
878     /**
879      * Sets the start time of the operation and invokes the callback to indicate that the
880      * operation has started. Does nothing if the pipeline has been stopped.
881      * <p/>
882      * This assumes that the "outcome" is not {@code null}.
883      *
884      * @param callbacks used to determine if the start callback can be invoked
885      * @return a function that sets the start time and invokes the callback
886      */
887     protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
888
889         return (outcome, thrown) -> {
890
891             if (callbacks.canStart()) {
892                 outcome.setStart(callbacks.getStartTime());
893                 outcome.setEnd(null);
894
895                 // pass a copy to the callback
896                 OperationOutcome outcome2 = new OperationOutcome(outcome);
897                 outcome2.setFinalOutcome(false);
898                 params.callbackStarted(outcome2);
899             }
900         };
901     }
902
903     /**
904      * Sets the end time of the operation and invokes the callback to indicate that the
905      * operation has completed. Does nothing if the pipeline has been stopped.
906      * <p/>
907      * This assumes that the "outcome" is not {@code null}.
908      * <p/>
909      * Note: the start time must be a reference rather than a plain value, because it's
910      * value must be gotten on-demand, when the returned function is executed at a later
911      * time.
912      *
913      * @param callbacks used to determine if the end callback can be invoked
914      * @return a function that sets the end time and invokes the callback
915      */
916     protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
917
918         return (outcome, thrown) -> {
919             if (callbacks.canEnd()) {
920                 outcome.setStart(callbacks.getStartTime());
921                 outcome.setEnd(callbacks.getEndTime());
922
923                 // pass a copy to the callback
924                 params.callbackCompleted(new OperationOutcome(outcome));
925             }
926         };
927     }
928
929     /**
930      * Sets an operation's outcome and message, based on a throwable.
931      *
932      * @param operation operation to be updated
933      * @return the updated operation
934      */
935     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
936         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
937         return setOutcome(operation, result);
938     }
939
940     /**
941      * Sets an operation's outcome and default message based on the result.
942      *
943      * @param operation operation to be updated
944      * @param result result of the operation
945      * @return the updated operation
946      */
947     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
948         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
949         operation.setResult(result);
950         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
951                         : ControlLoopOperation.FAILED_MSG);
952
953         return operation;
954     }
955
956     /**
957      * Determines if a throwable is due to a timeout.
958      *
959      * @param thrown throwable of interest
960      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
961      */
962     protected boolean isTimeout(Throwable thrown) {
963         if (thrown instanceof CompletionException) {
964             thrown = thrown.getCause();
965         }
966
967         return (thrown instanceof TimeoutException);
968     }
969
970     /**
971      * Logs a response. If the response is not of type, String, then it attempts to
972      * pretty-print it into JSON before logging.
973      *
974      * @param direction IN or OUT
975      * @param infra communication infrastructure on which it was published
976      * @param source source name (e.g., the URL or Topic name)
977      * @param response response to be logged
978      * @return the JSON text that was logged
979      */
980     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
981         String json;
982         try {
983             if (response == null) {
984                 json = null;
985             } else if (response instanceof String) {
986                 json = response.toString();
987             } else {
988                 json = makeCoder().encode(response, true);
989             }
990
991         } catch (CoderException e) {
992             String type = (direction == EventType.IN ? "response" : "request");
993             logger.warn("cannot pretty-print {}", type, e);
994             json = response.toString();
995         }
996
997         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
998
999         return json;
1000     }
1001
1002     // these may be overridden by subclasses or junit tests
1003
1004     /**
1005      * Gets the retry count.
1006      *
1007      * @param retry retry, extracted from the parameters, or {@code null}
1008      * @return the number of retries, or {@code 0} if no retries were specified
1009      */
1010     protected int getRetry(Integer retry) {
1011         return (retry == null ? 0 : retry);
1012     }
1013
1014     /**
1015      * Gets the retry wait, in milliseconds.
1016      *
1017      * @return the retry wait, in milliseconds
1018      */
1019     protected long getRetryWaitMs() {
1020         return DEFAULT_RETRY_WAIT_MS;
1021     }
1022
1023     /**
1024      * Gets the operation timeout.
1025      *
1026      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1027      *        {@code null}
1028      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1029      *         specified
1030      */
1031     protected long getTimeoutMs(Integer timeoutSec) {
1032         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1033     }
1034
1035     // these may be overridden by junit tests
1036
1037     protected Coder makeCoder() {
1038         return coder;
1039     }
1040 }