Immediately finish if guard is disabled
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedHashMap;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionException;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import java.util.function.BiConsumer;
38 import java.util.function.Function;
39 import java.util.function.Supplier;
40 import java.util.function.UnaryOperator;
41 import lombok.Getter;
42 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
43 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
44 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
45 import org.onap.policy.common.utils.coder.Coder;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.common.utils.coder.StandardCoder;
48 import org.onap.policy.controlloop.ControlLoopOperation;
49 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
50 import org.onap.policy.controlloop.actorserviceprovider.Operation;
51 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
52 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
53 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
54 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
55 import org.onap.policy.controlloop.policy.PolicyResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 /**
60  * Partial implementation of an operator. In general, it's preferable that subclasses
61  * would override {@link #startOperationAsync(int, OperationOutcome)
62  * startOperationAsync()}. However, if that proves to be too difficult, then they can
63  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
64  * if the operation requires any preprocessor steps, the subclass may choose to override
65  * {@link #startPreprocessorAsync()}.
66  * <p/>
67  * The futures returned by the methods within this class can be canceled, and will
68  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
69  * returned by overridden methods will do the same. Of course, if a class overrides
70  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
71  * be done to cancel that particular operation.
72  * <p/>
73  * In general tasks in a pipeline are executed by the same thread. However, the following
74  * should always be executed via the executor specified in "params":
75  * <ul>
76  * <li>start callback</li>
77  * <li>completion callback</li>
78  * <li>controller completion (i.e., delayedComplete())</li>
79  * </ul>
80  */
81 public abstract class OperationPartial implements Operation {
82     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
83     private static final Coder coder = new StandardCoder();
84
85     public static final String GUARD_ACTOR_NAME = "GUARD";
86     public static final String GUARD_OPERATION_NAME = "Decision";
87     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
88
89     private final OperatorConfig config;
90
91     /**
92      * Operation parameters.
93      */
94     protected final ControlLoopOperationParams params;
95
96     @Getter
97     private final String fullName;
98
99
100     /**
101      * Constructs the object.
102      *
103      * @param params operation parameters
104      * @param config configuration for this operation
105      */
106     public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
107         this.params = params;
108         this.config = config;
109         this.fullName = params.getActor() + "." + params.getOperation();
110     }
111
112     public Executor getBlockingExecutor() {
113         return config.getBlockingExecutor();
114     }
115
116     public String getActorName() {
117         return params.getActor();
118     }
119
120     public String getName() {
121         return params.getOperation();
122     }
123
124     @Override
125     public CompletableFuture<OperationOutcome> start() {
126         // allocate a controller for the entire operation
127         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
128
129         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
130         if (preproc == null) {
131             // no preprocessor required - just start the operation
132             return startOperationAttempt(controller, 1);
133         }
134
135         /*
136          * Do preprocessor first and then, if successful, start the operation. Note:
137          * operations create their own outcome, ignoring the outcome from any previous
138          * steps.
139          *
140          * Wrap the preprocessor to ensure "stop" is propagated to it.
141          */
142         // @formatter:off
143         controller.wrap(preproc)
144                         .exceptionally(fromException("preprocessor of operation"))
145                         .thenCompose(handlePreprocessorFailure(controller))
146                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
147                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
148         // @formatter:on
149
150         return controller;
151     }
152
153     /**
154      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
155      * invokes the call-backs, marks the controller complete, and returns an incomplete
156      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
157      * received.
158      * <p/>
159      * Assumes that no callbacks have been invoked yet.
160      *
161      * @param controller pipeline controller
162      * @return a function that checks the outcome status and continues, if successful, or
163      *         indicates a failure otherwise
164      */
165     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
166                     PipelineControllerFuture<OperationOutcome> controller) {
167
168         return outcome -> {
169
170             if (isSuccess(outcome)) {
171                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
172                 return CompletableFuture.completedFuture(outcome);
173             }
174
175             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
176
177             final Executor executor = params.getExecutor();
178             final CallbackManager callbacks = new CallbackManager();
179
180             // propagate "stop" to the callbacks
181             controller.add(callbacks);
182
183             final OperationOutcome outcome2 = params.makeOutcome();
184
185             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
186
187             outcome2.setFinalOutcome(true);
188             outcome2.setResult(PolicyResult.FAILURE_GUARD);
189             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
190
191             // @formatter:off
192             CompletableFuture.completedFuture(outcome2)
193                             .whenCompleteAsync(callbackStarted(callbacks), executor)
194                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
195                             .whenCompleteAsync(controller.delayedComplete(), executor);
196             // @formatter:on
197
198             return new CompletableFuture<>();
199         };
200     }
201
202     /**
203      * Invokes the operation's preprocessor step(s) as a "future". This method simply
204      * returns {@code null}.
205      * <p/>
206      * This method assumes the following:
207      * <ul>
208      * <li>the operator is alive</li>
209      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
210      * </ul>
211      *
212      * @return a function that will start the preprocessor and returns its outcome, or
213      *         {@code null} if this operation needs no preprocessor
214      */
215     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
216         return null;
217     }
218
219     /**
220      * Invokes the operation's guard step(s) as a "future".
221      * <p/>
222      * This method assumes the following:
223      * <ul>
224      * <li>the operator is alive</li>
225      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
226      * </ul>
227      *
228      * @return a function that will start the guard checks and returns its outcome, or
229      *         {@code null} if this operation has no guard
230      */
231     protected CompletableFuture<OperationOutcome> startGuardAsync() {
232         // get the guard payload
233         Map<String, Object> guardPayload = makeGuardPayload();
234
235         // wrap it in a "resource"
236         Map<String, Object> resource = new LinkedHashMap<>();
237         resource.put("guard", guardPayload);
238
239         Map<String, Object> payload = new LinkedHashMap<>();
240         payload.put("resource", resource);
241
242         /*
243          * Note: can't use constants from actor.guard, because that would create a
244          * circular dependency.
245          */
246         return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
247                         .payload(payload).build().start();
248     }
249
250     /**
251      * Creates a payload to execute a guard operation.
252      *
253      * @return a new guard payload
254      */
255     protected Map<String, Object> makeGuardPayload() {
256         Map<String, Object> guard = new LinkedHashMap<>();
257         guard.put("actor", params.getActor());
258         guard.put("recipe", params.getOperation());
259         guard.put("target", params.getTargetEntity());
260         guard.put("requestId", params.getRequestId());
261
262         String clname = params.getContext().getEvent().getClosedLoopControlName();
263         if (clname != null) {
264             guard.put("clname", clname);
265         }
266
267         return guard;
268     }
269
270     /**
271      * Starts the operation attempt, with no preprocessor. When all retries complete, it
272      * will complete the controller.
273      *
274      * @param controller controller for all operation attempts
275      * @param attempt attempt number, typically starting with 1
276      * @return a future that will return the final result of all attempts
277      */
278     private CompletableFuture<OperationOutcome> startOperationAttempt(
279                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
280
281         // propagate "stop" to the operation attempt
282         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
283                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
284
285         return controller;
286     }
287
288     /**
289      * Starts the operation attempt, without doing any retries.
290      *
291      * @param params operation parameters
292      * @param attempt attempt number, typically starting with 1
293      * @return a future that will return the result of a single operation attempt
294      */
295     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
296
297         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
298
299         final Executor executor = params.getExecutor();
300         final OperationOutcome outcome = params.makeOutcome();
301         final CallbackManager callbacks = new CallbackManager();
302
303         // this operation attempt gets its own controller
304         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
305
306         // propagate "stop" to the callbacks
307         controller.add(callbacks);
308
309         // @formatter:off
310         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
311                         .whenCompleteAsync(callbackStarted(callbacks), executor)
312                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
313         // @formatter:on
314
315         // handle timeouts, if specified
316         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
317         if (timeoutMillis > 0) {
318             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
319             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
320         }
321
322         /*
323          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
324          * before callbackCompleted() is invoked.
325          *
326          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
327          * the pipeline as the last step anyway.
328          */
329
330         // @formatter:off
331         future.exceptionally(fromException("operation"))
332                     .thenApply(setRetryFlag(attempt))
333                     .whenCompleteAsync(callbackStarted(callbacks), executor)
334                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
335                     .whenCompleteAsync(controller.delayedComplete(), executor);
336         // @formatter:on
337
338         return controller;
339     }
340
341     /**
342      * Determines if the outcome was successful.
343      *
344      * @param outcome outcome to examine
345      * @return {@code true} if the outcome was successful
346      */
347     protected boolean isSuccess(OperationOutcome outcome) {
348         return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
349     }
350
351     /**
352      * Determines if the outcome was a failure for this operator.
353      *
354      * @param outcome outcome to examine, or {@code null}
355      * @return {@code true} if the outcome is not {@code null} and was a failure
356      *         <i>and</i> was associated with this operator, {@code false} otherwise
357      */
358     protected boolean isActorFailed(OperationOutcome outcome) {
359         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
360     }
361
362     /**
363      * Determines if the given outcome is for this operation.
364      *
365      * @param outcome outcome to examine
366      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
367      */
368     protected boolean isSameOperation(OperationOutcome outcome) {
369         return OperationOutcome.isFor(outcome, getActorName(), getName());
370     }
371
372     /**
373      * Invokes the operation as a "future". This method simply invokes
374      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
375      * returning the result via a "future".
376      * <p/>
377      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
378      * the executor in the "params", as that may bring the background thread pool to a
379      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
380      * instead.
381      * <p/>
382      * This method assumes the following:
383      * <ul>
384      * <li>the operator is alive</li>
385      * <li>verifyRunning() has been invoked</li>
386      * <li>callbackStarted() has been invoked</li>
387      * <li>the invoker will perform appropriate timeout checks</li>
388      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
389      * </ul>
390      *
391      * @param attempt attempt number, typically starting with 1
392      * @return a function that will start the operation and return its result when
393      *         complete
394      */
395     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
396
397         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
398     }
399
400     /**
401      * Low-level method that performs the operation. This can make the same assumptions
402      * that are made by {@link #doOperationAsFuture()}. This particular method simply
403      * throws an {@link UnsupportedOperationException}.
404      *
405      * @param attempt attempt number, typically starting with 1
406      * @param operation the operation being performed
407      * @return the outcome of the operation
408      */
409     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
410
411         throw new UnsupportedOperationException("start operation " + getFullName());
412     }
413
414     /**
415      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
416      * FAILURE, assuming the policy specifies retries and the retry count has been
417      * exhausted.
418      *
419      * @param attempt latest attempt number, starting with 1
420      * @return a function to get the next future to execute
421      */
422     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
423
424         return origOutcome -> {
425             // ensure we have a non-null outcome
426             OperationOutcome outcome;
427             if (origOutcome != null) {
428                 outcome = origOutcome;
429             } else {
430                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
431                 outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
432             }
433
434             // ensure correct actor/operation
435             outcome.setActor(getActorName());
436             outcome.setOperation(getName());
437
438             // determine if we should retry, based on the result
439             if (outcome.getResult() != PolicyResult.FAILURE) {
440                 // do not retry success or other failure types (e.g., exception)
441                 outcome.setFinalOutcome(true);
442                 return outcome;
443             }
444
445             int retry = getRetry(params.getRetry());
446             if (retry <= 0) {
447                 // no retries were specified
448                 outcome.setFinalOutcome(true);
449
450             } else if (attempt <= retry) {
451                 // have more retries - not the final outcome
452                 outcome.setFinalOutcome(false);
453
454             } else {
455                 /*
456                  * retries were specified and we've already tried them all - change to
457                  * FAILURE_RETRIES
458                  */
459                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
460                 outcome.setResult(PolicyResult.FAILURE_RETRIES);
461                 outcome.setFinalOutcome(true);
462             }
463
464             return outcome;
465         };
466     }
467
468     /**
469      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
470      * was previously invoked, and thus that the "operation" is not {@code null}.
471      *
472      * @param controller controller for all of the retries
473      * @param attempt latest attempt number, starting with 1
474      * @return a function to get the next future to execute
475      */
476     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
477                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
478
479         return operation -> {
480             if (!isActorFailed(operation)) {
481                 // wrong type or wrong operation - just leave it as is
482                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
483                 controller.complete(operation);
484                 return new CompletableFuture<>();
485             }
486
487             if (getRetry(params.getRetry()) <= 0) {
488                 // no retries - already marked as FAILURE, so just return it
489                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
490                 controller.complete(operation);
491                 return new CompletableFuture<>();
492             }
493
494             /*
495              * Retry the operation.
496              */
497             long waitMs = getRetryWaitMs();
498             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
499
500             return sleep(waitMs, TimeUnit.MILLISECONDS)
501                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
502         };
503     }
504
505     /**
506      * Convenience method that starts a sleep(), running via a future.
507      *
508      * @param sleepTime time to sleep
509      * @param unit time unit
510      * @return a future that will complete when the sleep completes
511      */
512     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
513         if (sleepTime <= 0) {
514             return CompletableFuture.completedFuture(null);
515         }
516
517         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
518     }
519
520     /**
521      * Converts an exception into an operation outcome, returning a copy of the outcome to
522      * prevent background jobs from changing it.
523      *
524      * @param type type of item throwing the exception
525      * @return a function that will convert an exception into an operation outcome
526      */
527     private Function<Throwable, OperationOutcome> fromException(String type) {
528
529         return thrown -> {
530             OperationOutcome outcome = params.makeOutcome();
531
532             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
533                 // do not include exception in the message, as it just clutters the log
534                 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
535                                 params.getRequestId());
536             } else {
537                 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
538                                 params.getRequestId(), thrown);
539             }
540
541             return setOutcome(outcome, thrown);
542         };
543     }
544
545     /**
546      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
547      * any outstanding futures when one completes.
548      *
549      * @param futureMakers function to make a future. If the function returns
550      *        {@code null}, then no future is created for that function. On the other
551      *        hand, if the function throws an exception, then the previously created
552      *        functions are canceled and the exception is re-thrown
553      * @return a future to cancel or await an outcome, or {@code null} if no futures were
554      *         created. If this future is canceled, then all of the futures will be
555      *         canceled
556      */
557     public CompletableFuture<OperationOutcome> anyOf(
558                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
559
560         return anyOf(Arrays.asList(futureMakers));
561     }
562
563     /**
564      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
565      * any outstanding futures when one completes.
566      *
567      * @param futureMakers function to make a future. If the function returns
568      *        {@code null}, then no future is created for that function. On the other
569      *        hand, if the function throws an exception, then the previously created
570      *        functions are canceled and the exception is re-thrown
571      * @return a future to cancel or await an outcome, or {@code null} if no futures were
572      *         created. If this future is canceled, then all of the futures will be
573      *         canceled. Similarly, when this future completes, any incomplete futures
574      *         will be canceled
575      */
576     public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
577
578         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
579
580         CompletableFuture<OperationOutcome>[] futures =
581                         attachFutures(controller, futureMakers, UnaryOperator.identity());
582
583         if (futures.length == 0) {
584             // no futures were started
585             return null;
586         }
587
588         if (futures.length == 1) {
589             return futures[0];
590         }
591
592         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
593                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
594
595         return controller;
596     }
597
598     /**
599      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
600      *
601      * @param futureMakers function to make a future. If the function returns
602      *        {@code null}, then no future is created for that function. On the other
603      *        hand, if the function throws an exception, then the previously created
604      *        functions are canceled and the exception is re-thrown
605      * @return a future to cancel or await an outcome, or {@code null} if no futures were
606      *         created. If this future is canceled, then all of the futures will be
607      *         canceled
608      */
609     public CompletableFuture<OperationOutcome> allOf(
610                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
611
612         return allOf(Arrays.asList(futureMakers));
613     }
614
615     /**
616      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
617      *
618      * @param futureMakers function to make a future. If the function returns
619      *        {@code null}, then no future is created for that function. On the other
620      *        hand, if the function throws an exception, then the previously created
621      *        functions are canceled and the exception is re-thrown
622      * @return a future to cancel or await an outcome, or {@code null} if no futures were
623      *         created. If this future is canceled, then all of the futures will be
624      *         canceled. Similarly, when this future completes, any incomplete futures
625      *         will be canceled
626      */
627     public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
628         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
629
630         Queue<OperationOutcome> outcomes = new LinkedList<>();
631
632         CompletableFuture<OperationOutcome>[] futures =
633                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
634                             synchronized (outcomes) {
635                                 outcomes.add(outcome);
636                             }
637                             return outcome;
638                         }));
639
640         if (futures.length == 0) {
641             // no futures were started
642             return null;
643         }
644
645         if (futures.length == 1) {
646             return futures[0];
647         }
648
649         // @formatter:off
650         CompletableFuture.allOf(futures)
651                         .thenApply(unused -> combineOutcomes(outcomes))
652                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
653         // @formatter:on
654
655         return controller;
656     }
657
658     /**
659      * Invokes the functions to create the futures and attaches them to the controller.
660      *
661      * @param controller master controller for all of the futures
662      * @param futureMakers futures to be attached to the controller
663      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
664      *        Returns the adorned future
665      * @return an array of futures, possibly zero-length. If the array is of size one,
666      *         then that one item should be returned instead of the controller
667      */
668     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
669                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
670                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
671
672         if (futureMakers.isEmpty()) {
673             @SuppressWarnings("unchecked")
674             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
675             return result;
676         }
677
678         // the last, unadorned future that is created
679         CompletableFuture<OperationOutcome> lastFuture = null;
680
681         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
682
683         // make each future
684         for (var maker : futureMakers) {
685             try {
686                 CompletableFuture<OperationOutcome> future = maker.get();
687                 if (future == null) {
688                     continue;
689                 }
690
691                 // propagate "stop" to the future
692                 controller.add(future);
693
694                 futures.add(adorn.apply(future));
695
696                 lastFuture = future;
697
698             } catch (RuntimeException e) {
699                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
700                 controller.cancel(false);
701                 throw e;
702             }
703         }
704
705         @SuppressWarnings("unchecked")
706         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
707
708         if (result.length == 1) {
709             // special case - return the unadorned future
710             result[0] = lastFuture;
711             return result;
712         }
713
714         return futures.toArray(result);
715     }
716
717     /**
718      * Combines the outcomes from a set of tasks.
719      *
720      * @param outcomes outcomes to be examined
721      * @return the combined outcome
722      */
723     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
724
725         // identify the outcome with the highest priority
726         OperationOutcome outcome = outcomes.remove();
727         int priority = detmPriority(outcome);
728
729         for (OperationOutcome outcome2 : outcomes) {
730             int priority2 = detmPriority(outcome2);
731
732             if (priority2 > priority) {
733                 outcome = outcome2;
734                 priority = priority2;
735             }
736         }
737
738         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
739                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
740
741         return outcome;
742     }
743
744     /**
745      * Determines the priority of an outcome based on its result.
746      *
747      * @param outcome outcome to examine, or {@code null}
748      * @return the outcome's priority
749      */
750     protected int detmPriority(OperationOutcome outcome) {
751         if (outcome == null || outcome.getResult() == null) {
752             return 1;
753         }
754
755         switch (outcome.getResult()) {
756             case SUCCESS:
757                 return 0;
758
759             case FAILURE_GUARD:
760                 return 2;
761
762             case FAILURE_RETRIES:
763                 return 3;
764
765             case FAILURE:
766                 return 4;
767
768             case FAILURE_TIMEOUT:
769                 return 5;
770
771             case FAILURE_EXCEPTION:
772             default:
773                 return 6;
774         }
775     }
776
777     /**
778      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
779      * not created until the previous task completes. The pipeline returns the outcome of
780      * the last task executed.
781      *
782      * @param futureMakers functions to make the futures
783      * @return a future to cancel the sequence or await the outcome
784      */
785     public CompletableFuture<OperationOutcome> sequence(
786                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
787
788         return sequence(Arrays.asList(futureMakers));
789     }
790
791     /**
792      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
793      * not created until the previous task completes. The pipeline returns the outcome of
794      * the last task executed.
795      *
796      * @param futureMakers functions to make the futures
797      * @return a future to cancel the sequence or await the outcome, or {@code null} if
798      *         there were no tasks to perform
799      */
800     public CompletableFuture<OperationOutcome> sequence(
801                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
802
803         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
804
805         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
806         if (nextTask == null) {
807             // no tasks
808             return null;
809         }
810
811         if (queue.isEmpty()) {
812             // only one task - just return it rather than wrapping it in a controller
813             return nextTask;
814         }
815
816         /*
817          * multiple tasks - need a controller to stop whichever task is currently
818          * executing
819          */
820         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
821         final Executor executor = params.getExecutor();
822
823         // @formatter:off
824         controller.wrap(nextTask)
825                     .thenCompose(nextTaskOnSuccess(controller, queue))
826                     .whenCompleteAsync(controller.delayedComplete(), executor);
827         // @formatter:on
828
829         return controller;
830     }
831
832     /**
833      * Executes the next task in the queue, if the previous outcome was successful.
834      *
835      * @param controller pipeline controller
836      * @param taskQueue queue of tasks to be performed
837      * @return a future to execute the remaining tasks, or the current outcome, if it's a
838      *         failure, or if there are no more tasks
839      */
840     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
841                     PipelineControllerFuture<OperationOutcome> controller,
842                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
843
844         return outcome -> {
845             if (!isSuccess(outcome)) {
846                 // return the failure
847                 return CompletableFuture.completedFuture(outcome);
848             }
849
850             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
851             if (nextTask == null) {
852                 // no tasks - just return the success
853                 return CompletableFuture.completedFuture(outcome);
854             }
855
856             // @formatter:off
857             return controller
858                         .wrap(nextTask)
859                         .thenCompose(nextTaskOnSuccess(controller, taskQueue));
860             // @formatter:on
861         };
862     }
863
864     /**
865      * Gets the next task from the queue, skipping those that are {@code null}.
866      *
867      * @param taskQueue task queue
868      * @return the next task, or {@code null} if the queue is now empty
869      */
870     private CompletableFuture<OperationOutcome> getNextTask(
871                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
872
873         Supplier<CompletableFuture<OperationOutcome>> maker;
874
875         while ((maker = taskQueue.poll()) != null) {
876             CompletableFuture<OperationOutcome> future = maker.get();
877             if (future != null) {
878                 return future;
879             }
880         }
881
882         return null;
883     }
884
885     /**
886      * Sets the start time of the operation and invokes the callback to indicate that the
887      * operation has started. Does nothing if the pipeline has been stopped.
888      * <p/>
889      * This assumes that the "outcome" is not {@code null}.
890      *
891      * @param callbacks used to determine if the start callback can be invoked
892      * @return a function that sets the start time and invokes the callback
893      */
894     protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
895
896         return (outcome, thrown) -> {
897
898             if (callbacks.canStart()) {
899                 outcome.setStart(callbacks.getStartTime());
900                 outcome.setEnd(null);
901
902                 // pass a copy to the callback
903                 OperationOutcome outcome2 = new OperationOutcome(outcome);
904                 outcome2.setFinalOutcome(false);
905                 params.callbackStarted(outcome2);
906             }
907         };
908     }
909
910     /**
911      * Sets the end time of the operation and invokes the callback to indicate that the
912      * operation has completed. Does nothing if the pipeline has been stopped.
913      * <p/>
914      * This assumes that the "outcome" is not {@code null}.
915      * <p/>
916      * Note: the start time must be a reference rather than a plain value, because it's
917      * value must be gotten on-demand, when the returned function is executed at a later
918      * time.
919      *
920      * @param callbacks used to determine if the end callback can be invoked
921      * @return a function that sets the end time and invokes the callback
922      */
923     protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
924
925         return (outcome, thrown) -> {
926             if (callbacks.canEnd()) {
927                 outcome.setStart(callbacks.getStartTime());
928                 outcome.setEnd(callbacks.getEndTime());
929
930                 // pass a copy to the callback
931                 params.callbackCompleted(new OperationOutcome(outcome));
932             }
933         };
934     }
935
936     /**
937      * Sets an operation's outcome and message, based on a throwable.
938      *
939      * @param operation operation to be updated
940      * @return the updated operation
941      */
942     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
943         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
944         return setOutcome(operation, result);
945     }
946
947     /**
948      * Sets an operation's outcome and default message based on the result.
949      *
950      * @param operation operation to be updated
951      * @param result result of the operation
952      * @return the updated operation
953      */
954     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
955         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
956         operation.setResult(result);
957         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
958                         : ControlLoopOperation.FAILED_MSG);
959
960         return operation;
961     }
962
963     /**
964      * Determines if a throwable is due to a timeout.
965      *
966      * @param thrown throwable of interest
967      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
968      */
969     protected boolean isTimeout(Throwable thrown) {
970         if (thrown instanceof CompletionException) {
971             thrown = thrown.getCause();
972         }
973
974         return (thrown instanceof TimeoutException);
975     }
976
977     /**
978      * Logs a response. If the response is not of type, String, then it attempts to
979      * pretty-print it into JSON before logging.
980      *
981      * @param direction IN or OUT
982      * @param infra communication infrastructure on which it was published
983      * @param source source name (e.g., the URL or Topic name)
984      * @param response response to be logged
985      * @return the JSON text that was logged
986      */
987     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
988         String json;
989         try {
990             if (response == null) {
991                 json = null;
992             } else if (response instanceof String) {
993                 json = response.toString();
994             } else {
995                 json = makeCoder().encode(response, true);
996             }
997
998         } catch (CoderException e) {
999             String type = (direction == EventType.IN ? "response" : "request");
1000             logger.warn("cannot pretty-print {}", type, e);
1001             json = response.toString();
1002         }
1003
1004         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1005
1006         return json;
1007     }
1008
1009     // these may be overridden by subclasses or junit tests
1010
1011     /**
1012      * Gets the retry count.
1013      *
1014      * @param retry retry, extracted from the parameters, or {@code null}
1015      * @return the number of retries, or {@code 0} if no retries were specified
1016      */
1017     protected int getRetry(Integer retry) {
1018         return (retry == null ? 0 : retry);
1019     }
1020
1021     /**
1022      * Gets the retry wait, in milliseconds.
1023      *
1024      * @return the retry wait, in milliseconds
1025      */
1026     protected long getRetryWaitMs() {
1027         return DEFAULT_RETRY_WAIT_MS;
1028     }
1029
1030     /**
1031      * Gets the operation timeout.
1032      *
1033      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1034      *        {@code null}
1035      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1036      *         specified
1037      */
1038     protected long getTimeoutMs(Integer timeoutSec) {
1039         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1040     }
1041
1042     // these may be overridden by junit tests
1043
1044     protected Coder makeCoder() {
1045         return coder;
1046     }
1047 }