Change Actor makeCoder() to getCoder()
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedHashMap;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.UUID;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.CompletionException;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.function.BiConsumer;
39 import java.util.function.Function;
40 import java.util.function.Supplier;
41 import java.util.function.UnaryOperator;
42 import lombok.AccessLevel;
43 import lombok.Getter;
44 import lombok.Setter;
45 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
46 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
48 import org.onap.policy.common.utils.coder.Coder;
49 import org.onap.policy.common.utils.coder.CoderException;
50 import org.onap.policy.common.utils.coder.StandardCoder;
51 import org.onap.policy.controlloop.ControlLoopOperation;
52 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
53 import org.onap.policy.controlloop.actorserviceprovider.Operation;
54 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
55 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
56 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
57 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
58 import org.onap.policy.controlloop.policy.PolicyResult;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  * Partial implementation of an operator. In general, it's preferable that subclasses
64  * would override {@link #startOperationAsync(int, OperationOutcome)
65  * startOperationAsync()}. However, if that proves to be too difficult, then they can
66  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
67  * if the operation requires any preprocessor steps, the subclass may choose to override
68  * {@link #startPreprocessorAsync()}.
69  * <p/>
70  * The futures returned by the methods within this class can be canceled, and will
71  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
72  * returned by overridden methods will do the same. Of course, if a class overrides
73  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
74  * be done to cancel that particular operation.
75  * <p/>
76  * In general tasks in a pipeline are executed by the same thread. However, the following
77  * should always be executed via the executor specified in "params":
78  * <ul>
79  * <li>start callback</li>
80  * <li>completion callback</li>
81  * <li>controller completion (i.e., delayedComplete())</li>
82  * </ul>
83  */
84 public abstract class OperationPartial implements Operation {
85     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
86     private static final Coder coder = new StandardCoder();
87
88     public static final String GUARD_ACTOR_NAME = "GUARD";
89     public static final String GUARD_OPERATION_NAME = "Decision";
90     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
91
92     private final OperatorConfig config;
93
94     /**
95      * Operation parameters.
96      */
97     protected final ControlLoopOperationParams params;
98
99     @Getter
100     private final String fullName;
101
102     @Getter
103     @Setter(AccessLevel.PROTECTED)
104     private String subRequestId;
105
106
107     /**
108      * Constructs the object.
109      *
110      * @param params operation parameters
111      * @param config configuration for this operation
112      */
113     public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
114         this.params = params;
115         this.config = config;
116         this.fullName = params.getActor() + "." + params.getOperation();
117     }
118
119     public Executor getBlockingExecutor() {
120         return config.getBlockingExecutor();
121     }
122
123     public String getActorName() {
124         return params.getActor();
125     }
126
127     public String getName() {
128         return params.getOperation();
129     }
130
131     @Override
132     public CompletableFuture<OperationOutcome> start() {
133         // allocate a controller for the entire operation
134         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
135
136         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
137         if (preproc == null) {
138             // no preprocessor required - just start the operation
139             return startOperationAttempt(controller, 1);
140         }
141
142         /*
143          * Do preprocessor first and then, if successful, start the operation. Note:
144          * operations create their own outcome, ignoring the outcome from any previous
145          * steps.
146          *
147          * Wrap the preprocessor to ensure "stop" is propagated to it.
148          */
149         // @formatter:off
150         controller.wrap(preproc)
151                         .exceptionally(fromException("preprocessor of operation"))
152                         .thenCompose(handlePreprocessorFailure(controller))
153                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
154                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
155         // @formatter:on
156
157         return controller;
158     }
159
160     /**
161      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
162      * invokes the call-backs, marks the controller complete, and returns an incomplete
163      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
164      * received.
165      * <p/>
166      * Assumes that no callbacks have been invoked yet.
167      *
168      * @param controller pipeline controller
169      * @return a function that checks the outcome status and continues, if successful, or
170      *         indicates a failure otherwise
171      */
172     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
173                     PipelineControllerFuture<OperationOutcome> controller) {
174
175         return outcome -> {
176
177             if (isSuccess(outcome)) {
178                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
179                 return CompletableFuture.completedFuture(outcome);
180             }
181
182             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
183
184             final Executor executor = params.getExecutor();
185             final CallbackManager callbacks = new CallbackManager();
186
187             // propagate "stop" to the callbacks
188             controller.add(callbacks);
189
190             final OperationOutcome outcome2 = params.makeOutcome();
191
192             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
193
194             outcome2.setFinalOutcome(true);
195             outcome2.setResult(PolicyResult.FAILURE_GUARD);
196             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
197
198             // @formatter:off
199             CompletableFuture.completedFuture(outcome2)
200                             .whenCompleteAsync(callbackStarted(callbacks), executor)
201                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
202                             .whenCompleteAsync(controller.delayedComplete(), executor);
203             // @formatter:on
204
205             return new CompletableFuture<>();
206         };
207     }
208
209     /**
210      * Invokes the operation's preprocessor step(s) as a "future". This method simply
211      * returns {@code null}.
212      * <p/>
213      * This method assumes the following:
214      * <ul>
215      * <li>the operator is alive</li>
216      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
217      * </ul>
218      *
219      * @return a function that will start the preprocessor and returns its outcome, or
220      *         {@code null} if this operation needs no preprocessor
221      */
222     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
223         return null;
224     }
225
226     /**
227      * Invokes the operation's guard step(s) as a "future".
228      * <p/>
229      * This method assumes the following:
230      * <ul>
231      * <li>the operator is alive</li>
232      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
233      * </ul>
234      *
235      * @return a function that will start the guard checks and returns its outcome, or
236      *         {@code null} if this operation has no guard
237      */
238     protected CompletableFuture<OperationOutcome> startGuardAsync() {
239         // get the guard payload
240         Map<String, Object> payload = makeGuardPayload();
241
242         /*
243          * Note: can't use constants from actor.guard, because that would create a
244          * circular dependency.
245          */
246         return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
247                         .payload(payload).build().start();
248     }
249
250     /**
251      * Creates a payload to execute a guard operation.
252      *
253      * @return a new guard payload
254      */
255     protected Map<String, Object> makeGuardPayload() {
256         Map<String, Object> guard = new LinkedHashMap<>();
257         guard.put("actor", params.getActor());
258         guard.put("operation", params.getOperation());
259         guard.put("target", params.getTargetEntity());
260         guard.put("requestId", params.getRequestId());
261
262         String clname = params.getContext().getEvent().getClosedLoopControlName();
263         if (clname != null) {
264             guard.put("clname", clname);
265         }
266
267         return guard;
268     }
269
270     /**
271      * Starts the operation attempt, with no preprocessor. When all retries complete, it
272      * will complete the controller.
273      *
274      * @param controller controller for all operation attempts
275      * @param attempt attempt number, typically starting with 1
276      * @return a future that will return the final result of all attempts
277      */
278     private CompletableFuture<OperationOutcome> startOperationAttempt(
279                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
280
281         generateSubRequestId(attempt);
282
283         // propagate "stop" to the operation attempt
284         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
285                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
286
287         return controller;
288     }
289
290     /**
291      * Generates and sets {@link #subRequestId} to a new subrequest ID.
292      * @param attempt attempt number, typically starting with 1
293      */
294     public void generateSubRequestId(int attempt) {
295         // Note: this should be "protected", but that makes junits much messier
296
297         setSubRequestId(UUID.randomUUID().toString());
298     }
299
300     /**
301      * Starts the operation attempt, without doing any retries.
302      *
303      * @param params operation parameters
304      * @param attempt attempt number, typically starting with 1
305      * @return a future that will return the result of a single operation attempt
306      */
307     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
308
309         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
310
311         final Executor executor = params.getExecutor();
312         final OperationOutcome outcome = params.makeOutcome();
313         final CallbackManager callbacks = new CallbackManager();
314
315         // this operation attempt gets its own controller
316         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
317
318         // propagate "stop" to the callbacks
319         controller.add(callbacks);
320
321         // @formatter:off
322         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
323                         .whenCompleteAsync(callbackStarted(callbacks), executor)
324                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
325         // @formatter:on
326
327         // handle timeouts, if specified
328         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
329         if (timeoutMillis > 0) {
330             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
331             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
332         }
333
334         /*
335          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
336          * before callbackCompleted() is invoked.
337          *
338          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
339          * the pipeline as the last step anyway.
340          */
341
342         // @formatter:off
343         future.exceptionally(fromException("operation"))
344                     .thenApply(setRetryFlag(attempt))
345                     .whenCompleteAsync(callbackStarted(callbacks), executor)
346                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
347                     .whenCompleteAsync(controller.delayedComplete(), executor);
348         // @formatter:on
349
350         return controller;
351     }
352
353     /**
354      * Determines if the outcome was successful.
355      *
356      * @param outcome outcome to examine
357      * @return {@code true} if the outcome was successful
358      */
359     protected boolean isSuccess(OperationOutcome outcome) {
360         return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
361     }
362
363     /**
364      * Determines if the outcome was a failure for this operator.
365      *
366      * @param outcome outcome to examine, or {@code null}
367      * @return {@code true} if the outcome is not {@code null} and was a failure
368      *         <i>and</i> was associated with this operator, {@code false} otherwise
369      */
370     protected boolean isActorFailed(OperationOutcome outcome) {
371         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
372     }
373
374     /**
375      * Determines if the given outcome is for this operation.
376      *
377      * @param outcome outcome to examine
378      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
379      */
380     protected boolean isSameOperation(OperationOutcome outcome) {
381         return OperationOutcome.isFor(outcome, getActorName(), getName());
382     }
383
384     /**
385      * Invokes the operation as a "future". This method simply invokes
386      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
387      * returning the result via a "future".
388      * <p/>
389      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
390      * the executor in the "params", as that may bring the background thread pool to a
391      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
392      * instead.
393      * <p/>
394      * This method assumes the following:
395      * <ul>
396      * <li>the operator is alive</li>
397      * <li>verifyRunning() has been invoked</li>
398      * <li>callbackStarted() has been invoked</li>
399      * <li>the invoker will perform appropriate timeout checks</li>
400      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
401      * </ul>
402      *
403      * @param attempt attempt number, typically starting with 1
404      * @return a function that will start the operation and return its result when
405      *         complete
406      */
407     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
408
409         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
410     }
411
412     /**
413      * Low-level method that performs the operation. This can make the same assumptions
414      * that are made by {@link #doOperationAsFuture()}. This particular method simply
415      * throws an {@link UnsupportedOperationException}.
416      *
417      * @param attempt attempt number, typically starting with 1
418      * @param operation the operation being performed
419      * @return the outcome of the operation
420      */
421     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
422
423         throw new UnsupportedOperationException("start operation " + getFullName());
424     }
425
426     /**
427      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
428      * FAILURE, assuming the policy specifies retries and the retry count has been
429      * exhausted.
430      *
431      * @param attempt latest attempt number, starting with 1
432      * @return a function to get the next future to execute
433      */
434     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
435
436         return origOutcome -> {
437             // ensure we have a non-null outcome
438             OperationOutcome outcome;
439             if (origOutcome != null) {
440                 outcome = origOutcome;
441             } else {
442                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
443                 outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
444             }
445
446             // ensure correct actor/operation
447             outcome.setActor(getActorName());
448             outcome.setOperation(getName());
449
450             // determine if we should retry, based on the result
451             if (outcome.getResult() != PolicyResult.FAILURE) {
452                 // do not retry success or other failure types (e.g., exception)
453                 outcome.setFinalOutcome(true);
454                 return outcome;
455             }
456
457             int retry = getRetry(params.getRetry());
458             if (retry <= 0) {
459                 // no retries were specified
460                 outcome.setFinalOutcome(true);
461
462             } else if (attempt <= retry) {
463                 // have more retries - not the final outcome
464                 outcome.setFinalOutcome(false);
465
466             } else {
467                 /*
468                  * retries were specified and we've already tried them all - change to
469                  * FAILURE_RETRIES
470                  */
471                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
472                 outcome.setResult(PolicyResult.FAILURE_RETRIES);
473                 outcome.setFinalOutcome(true);
474             }
475
476             return outcome;
477         };
478     }
479
480     /**
481      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
482      * was previously invoked, and thus that the "operation" is not {@code null}.
483      *
484      * @param controller controller for all of the retries
485      * @param attempt latest attempt number, starting with 1
486      * @return a function to get the next future to execute
487      */
488     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
489                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
490
491         return operation -> {
492             if (!isActorFailed(operation)) {
493                 // wrong type or wrong operation - just leave it as is
494                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
495                 controller.complete(operation);
496                 return new CompletableFuture<>();
497             }
498
499             if (getRetry(params.getRetry()) <= 0) {
500                 // no retries - already marked as FAILURE, so just return it
501                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
502                 controller.complete(operation);
503                 return new CompletableFuture<>();
504             }
505
506             /*
507              * Retry the operation.
508              */
509             long waitMs = getRetryWaitMs();
510             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
511
512             return sleep(waitMs, TimeUnit.MILLISECONDS)
513                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
514         };
515     }
516
517     /**
518      * Convenience method that starts a sleep(), running via a future.
519      *
520      * @param sleepTime time to sleep
521      * @param unit time unit
522      * @return a future that will complete when the sleep completes
523      */
524     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
525         if (sleepTime <= 0) {
526             return CompletableFuture.completedFuture(null);
527         }
528
529         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
530     }
531
532     /**
533      * Converts an exception into an operation outcome, returning a copy of the outcome to
534      * prevent background jobs from changing it.
535      *
536      * @param type type of item throwing the exception
537      * @return a function that will convert an exception into an operation outcome
538      */
539     private Function<Throwable, OperationOutcome> fromException(String type) {
540
541         return thrown -> {
542             OperationOutcome outcome = params.makeOutcome();
543
544             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
545                 // do not include exception in the message, as it just clutters the log
546                 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
547                                 params.getRequestId());
548             } else {
549                 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
550                                 params.getRequestId(), thrown);
551             }
552
553             return setOutcome(outcome, thrown);
554         };
555     }
556
557     /**
558      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
559      * any outstanding futures when one completes.
560      *
561      * @param futureMakers function to make a future. If the function returns
562      *        {@code null}, then no future is created for that function. On the other
563      *        hand, if the function throws an exception, then the previously created
564      *        functions are canceled and the exception is re-thrown
565      * @return a future to cancel or await an outcome, or {@code null} if no futures were
566      *         created. If this future is canceled, then all of the futures will be
567      *         canceled
568      */
569     public CompletableFuture<OperationOutcome> anyOf(
570                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
571
572         return anyOf(Arrays.asList(futureMakers));
573     }
574
575     /**
576      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
577      * any outstanding futures when one completes.
578      *
579      * @param futureMakers function to make a future. If the function returns
580      *        {@code null}, then no future is created for that function. On the other
581      *        hand, if the function throws an exception, then the previously created
582      *        functions are canceled and the exception is re-thrown
583      * @return a future to cancel or await an outcome, or {@code null} if no futures were
584      *         created. If this future is canceled, then all of the futures will be
585      *         canceled. Similarly, when this future completes, any incomplete futures
586      *         will be canceled
587      */
588     public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
589
590         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
591
592         CompletableFuture<OperationOutcome>[] futures =
593                         attachFutures(controller, futureMakers, UnaryOperator.identity());
594
595         if (futures.length == 0) {
596             // no futures were started
597             return null;
598         }
599
600         if (futures.length == 1) {
601             return futures[0];
602         }
603
604         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
605                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
606
607         return controller;
608     }
609
610     /**
611      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
612      *
613      * @param futureMakers function to make a future. If the function returns
614      *        {@code null}, then no future is created for that function. On the other
615      *        hand, if the function throws an exception, then the previously created
616      *        functions are canceled and the exception is re-thrown
617      * @return a future to cancel or await an outcome, or {@code null} if no futures were
618      *         created. If this future is canceled, then all of the futures will be
619      *         canceled
620      */
621     public CompletableFuture<OperationOutcome> allOf(
622                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
623
624         return allOf(Arrays.asList(futureMakers));
625     }
626
627     /**
628      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
629      *
630      * @param futureMakers function to make a future. If the function returns
631      *        {@code null}, then no future is created for that function. On the other
632      *        hand, if the function throws an exception, then the previously created
633      *        functions are canceled and the exception is re-thrown
634      * @return a future to cancel or await an outcome, or {@code null} if no futures were
635      *         created. If this future is canceled, then all of the futures will be
636      *         canceled. Similarly, when this future completes, any incomplete futures
637      *         will be canceled
638      */
639     public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
640         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
641
642         Queue<OperationOutcome> outcomes = new LinkedList<>();
643
644         CompletableFuture<OperationOutcome>[] futures =
645                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
646                             synchronized (outcomes) {
647                                 outcomes.add(outcome);
648                             }
649                             return outcome;
650                         }));
651
652         if (futures.length == 0) {
653             // no futures were started
654             return null;
655         }
656
657         if (futures.length == 1) {
658             return futures[0];
659         }
660
661         // @formatter:off
662         CompletableFuture.allOf(futures)
663                         .thenApply(unused -> combineOutcomes(outcomes))
664                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
665         // @formatter:on
666
667         return controller;
668     }
669
670     /**
671      * Invokes the functions to create the futures and attaches them to the controller.
672      *
673      * @param controller master controller for all of the futures
674      * @param futureMakers futures to be attached to the controller
675      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
676      *        Returns the adorned future
677      * @return an array of futures, possibly zero-length. If the array is of size one,
678      *         then that one item should be returned instead of the controller
679      */
680     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
681                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
682                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
683
684         if (futureMakers.isEmpty()) {
685             @SuppressWarnings("unchecked")
686             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
687             return result;
688         }
689
690         // the last, unadorned future that is created
691         CompletableFuture<OperationOutcome> lastFuture = null;
692
693         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
694
695         // make each future
696         for (var maker : futureMakers) {
697             try {
698                 CompletableFuture<OperationOutcome> future = maker.get();
699                 if (future == null) {
700                     continue;
701                 }
702
703                 // propagate "stop" to the future
704                 controller.add(future);
705
706                 futures.add(adorn.apply(future));
707
708                 lastFuture = future;
709
710             } catch (RuntimeException e) {
711                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
712                 controller.cancel(false);
713                 throw e;
714             }
715         }
716
717         @SuppressWarnings("unchecked")
718         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
719
720         if (result.length == 1) {
721             // special case - return the unadorned future
722             result[0] = lastFuture;
723             return result;
724         }
725
726         return futures.toArray(result);
727     }
728
729     /**
730      * Combines the outcomes from a set of tasks.
731      *
732      * @param outcomes outcomes to be examined
733      * @return the combined outcome
734      */
735     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
736
737         // identify the outcome with the highest priority
738         OperationOutcome outcome = outcomes.remove();
739         int priority = detmPriority(outcome);
740
741         for (OperationOutcome outcome2 : outcomes) {
742             int priority2 = detmPriority(outcome2);
743
744             if (priority2 > priority) {
745                 outcome = outcome2;
746                 priority = priority2;
747             }
748         }
749
750         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
751                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
752
753         return outcome;
754     }
755
756     /**
757      * Determines the priority of an outcome based on its result.
758      *
759      * @param outcome outcome to examine, or {@code null}
760      * @return the outcome's priority
761      */
762     protected int detmPriority(OperationOutcome outcome) {
763         if (outcome == null || outcome.getResult() == null) {
764             return 1;
765         }
766
767         switch (outcome.getResult()) {
768             case SUCCESS:
769                 return 0;
770
771             case FAILURE_GUARD:
772                 return 2;
773
774             case FAILURE_RETRIES:
775                 return 3;
776
777             case FAILURE:
778                 return 4;
779
780             case FAILURE_TIMEOUT:
781                 return 5;
782
783             case FAILURE_EXCEPTION:
784             default:
785                 return 6;
786         }
787     }
788
789     /**
790      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
791      * not created until the previous task completes. The pipeline returns the outcome of
792      * the last task executed.
793      *
794      * @param futureMakers functions to make the futures
795      * @return a future to cancel the sequence or await the outcome
796      */
797     public CompletableFuture<OperationOutcome> sequence(
798                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
799
800         return sequence(Arrays.asList(futureMakers));
801     }
802
803     /**
804      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
805      * not created until the previous task completes. The pipeline returns the outcome of
806      * the last task executed.
807      *
808      * @param futureMakers functions to make the futures
809      * @return a future to cancel the sequence or await the outcome, or {@code null} if
810      *         there were no tasks to perform
811      */
812     public CompletableFuture<OperationOutcome> sequence(
813                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
814
815         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
816
817         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
818         if (nextTask == null) {
819             // no tasks
820             return null;
821         }
822
823         if (queue.isEmpty()) {
824             // only one task - just return it rather than wrapping it in a controller
825             return nextTask;
826         }
827
828         /*
829          * multiple tasks - need a controller to stop whichever task is currently
830          * executing
831          */
832         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
833         final Executor executor = params.getExecutor();
834
835         // @formatter:off
836         controller.wrap(nextTask)
837                     .thenCompose(nextTaskOnSuccess(controller, queue))
838                     .whenCompleteAsync(controller.delayedComplete(), executor);
839         // @formatter:on
840
841         return controller;
842     }
843
844     /**
845      * Executes the next task in the queue, if the previous outcome was successful.
846      *
847      * @param controller pipeline controller
848      * @param taskQueue queue of tasks to be performed
849      * @return a future to execute the remaining tasks, or the current outcome, if it's a
850      *         failure, or if there are no more tasks
851      */
852     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
853                     PipelineControllerFuture<OperationOutcome> controller,
854                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
855
856         return outcome -> {
857             if (!isSuccess(outcome)) {
858                 // return the failure
859                 return CompletableFuture.completedFuture(outcome);
860             }
861
862             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
863             if (nextTask == null) {
864                 // no tasks - just return the success
865                 return CompletableFuture.completedFuture(outcome);
866             }
867
868             // @formatter:off
869             return controller
870                         .wrap(nextTask)
871                         .thenCompose(nextTaskOnSuccess(controller, taskQueue));
872             // @formatter:on
873         };
874     }
875
876     /**
877      * Gets the next task from the queue, skipping those that are {@code null}.
878      *
879      * @param taskQueue task queue
880      * @return the next task, or {@code null} if the queue is now empty
881      */
882     private CompletableFuture<OperationOutcome> getNextTask(
883                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
884
885         Supplier<CompletableFuture<OperationOutcome>> maker;
886
887         while ((maker = taskQueue.poll()) != null) {
888             CompletableFuture<OperationOutcome> future = maker.get();
889             if (future != null) {
890                 return future;
891             }
892         }
893
894         return null;
895     }
896
897     /**
898      * Sets the start time of the operation and invokes the callback to indicate that the
899      * operation has started. Does nothing if the pipeline has been stopped.
900      * <p/>
901      * This assumes that the "outcome" is not {@code null}.
902      *
903      * @param callbacks used to determine if the start callback can be invoked
904      * @return a function that sets the start time and invokes the callback
905      */
906     protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
907
908         return (outcome, thrown) -> {
909
910             if (callbacks.canStart()) {
911                 outcome.setSubRequestId(getSubRequestId());
912                 outcome.setStart(callbacks.getStartTime());
913                 outcome.setEnd(null);
914
915                 // pass a copy to the callback
916                 OperationOutcome outcome2 = new OperationOutcome(outcome);
917                 outcome2.setFinalOutcome(false);
918                 params.callbackStarted(outcome2);
919             }
920         };
921     }
922
923     /**
924      * Sets the end time of the operation and invokes the callback to indicate that the
925      * operation has completed. Does nothing if the pipeline has been stopped.
926      * <p/>
927      * This assumes that the "outcome" is not {@code null}.
928      * <p/>
929      * Note: the start time must be a reference rather than a plain value, because it's
930      * value must be gotten on-demand, when the returned function is executed at a later
931      * time.
932      *
933      * @param callbacks used to determine if the end callback can be invoked
934      * @return a function that sets the end time and invokes the callback
935      */
936     protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
937
938         return (outcome, thrown) -> {
939             if (callbacks.canEnd()) {
940                 outcome.setSubRequestId(getSubRequestId());
941                 outcome.setStart(callbacks.getStartTime());
942                 outcome.setEnd(callbacks.getEndTime());
943
944                 // pass a copy to the callback
945                 params.callbackCompleted(new OperationOutcome(outcome));
946             }
947         };
948     }
949
950     /**
951      * Sets an operation's outcome and message, based on a throwable.
952      *
953      * @param operation operation to be updated
954      * @return the updated operation
955      */
956     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
957         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
958         return setOutcome(operation, result);
959     }
960
961     /**
962      * Sets an operation's outcome and default message based on the result.
963      *
964      * @param operation operation to be updated
965      * @param result result of the operation
966      * @return the updated operation
967      */
968     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
969         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
970         operation.setResult(result);
971         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
972                         : ControlLoopOperation.FAILED_MSG);
973
974         return operation;
975     }
976
977     /**
978      * Determines if a throwable is due to a timeout.
979      *
980      * @param thrown throwable of interest
981      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
982      */
983     protected boolean isTimeout(Throwable thrown) {
984         if (thrown instanceof CompletionException) {
985             thrown = thrown.getCause();
986         }
987
988         return (thrown instanceof TimeoutException);
989     }
990
991     /**
992      * Logs a message. If the message is not of type, String, then it attempts to
993      * pretty-print it into JSON before logging.
994      *
995      * @param direction IN or OUT
996      * @param infra communication infrastructure on which it was published
997      * @param source source name (e.g., the URL or Topic name)
998      * @param message message to be logged
999      * @return the JSON text that was logged
1000      */
1001     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
1002         String json;
1003         try {
1004             json = prettyPrint(message);
1005
1006         } catch (IllegalArgumentException e) {
1007             String type = (direction == EventType.IN ? "response" : "request");
1008             logger.warn("cannot pretty-print {}", type, e);
1009             json = message.toString();
1010         }
1011
1012         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1013
1014         return json;
1015     }
1016
1017     /**
1018      * Converts a message to a "pretty-printed" String using the operation's normal
1019      * serialization provider (i.e., it's <i>coder</i>).
1020      *
1021      * @param message response to be logged
1022      * @return the JSON text that was logged
1023      * @throws IllegalArgumentException if the message cannot be converted
1024      */
1025     public <T> String prettyPrint(T message) {
1026         if (message == null) {
1027             return null;
1028         } else if (message instanceof String) {
1029             return message.toString();
1030         } else {
1031             try {
1032                 return getCoder().encode(message, true);
1033             } catch (CoderException e) {
1034                 throw new IllegalArgumentException("cannot encode message", e);
1035             }
1036         }
1037     }
1038
1039     // these may be overridden by subclasses or junit tests
1040
1041     /**
1042      * Gets the retry count.
1043      *
1044      * @param retry retry, extracted from the parameters, or {@code null}
1045      * @return the number of retries, or {@code 0} if no retries were specified
1046      */
1047     protected int getRetry(Integer retry) {
1048         return (retry == null ? 0 : retry);
1049     }
1050
1051     /**
1052      * Gets the retry wait, in milliseconds.
1053      *
1054      * @return the retry wait, in milliseconds
1055      */
1056     protected long getRetryWaitMs() {
1057         return DEFAULT_RETRY_WAIT_MS;
1058     }
1059
1060     /**
1061      * Gets the operation timeout.
1062      *
1063      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1064      *        {@code null}
1065      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1066      *         specified
1067      */
1068     protected long getTimeoutMs(Integer timeoutSec) {
1069         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1070     }
1071
1072     // these may be overridden by junit tests
1073
1074     protected Coder getCoder() {
1075         return coder;
1076     }
1077 }