Merge "Support separate VF Counts for different Targets"
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedHashMap;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.concurrent.CompletableFuture;
32 import java.util.concurrent.CompletionException;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import java.util.function.BiConsumer;
37 import java.util.function.Function;
38 import java.util.function.Supplier;
39 import java.util.function.UnaryOperator;
40 import lombok.Getter;
41 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
42 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
43 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
44 import org.onap.policy.common.utils.coder.Coder;
45 import org.onap.policy.common.utils.coder.CoderException;
46 import org.onap.policy.common.utils.coder.StandardCoder;
47 import org.onap.policy.controlloop.ControlLoopOperation;
48 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
49 import org.onap.policy.controlloop.actorserviceprovider.Operation;
50 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
51 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
52 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
53 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
54 import org.onap.policy.controlloop.policy.PolicyResult;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * Partial implementation of an operator. In general, it's preferable that subclasses
60  * would override {@link #startOperationAsync(int, OperationOutcome)
61  * startOperationAsync()}. However, if that proves to be too difficult, then they can
62  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
63  * if the operation requires any preprocessor steps, the subclass may choose to override
64  * {@link #startPreprocessorAsync()}.
65  * <p/>
66  * The futures returned by the methods within this class can be canceled, and will
67  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
68  * returned by overridden methods will do the same. Of course, if a class overrides
69  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
70  * be done to cancel that particular operation.
71  */
72 public abstract class OperationPartial implements Operation {
73     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
74     private static final Coder coder = new StandardCoder();
75
76     public static final String GUARD_ACTOR_NAME = "GUARD";
77     public static final String GUARD_OPERATION_NAME = "Decision";
78     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
79
80     private final OperatorConfig config;
81
82     /**
83      * Operation parameters.
84      */
85     protected final ControlLoopOperationParams params;
86
87     @Getter
88     private final String fullName;
89
90
91     /**
92      * Constructs the object.
93      *
94      * @param params operation parameters
95      * @param config configuration for this operation
96      */
97     public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
98         this.params = params;
99         this.config = config;
100         this.fullName = params.getActor() + "." + params.getOperation();
101     }
102
103     public Executor getBlockingExecutor() {
104         return config.getBlockingExecutor();
105     }
106
107     public String getActorName() {
108         return params.getActor();
109     }
110
111     public String getName() {
112         return params.getOperation();
113     }
114
115     @Override
116     public CompletableFuture<OperationOutcome> start() {
117         // allocate a controller for the entire operation
118         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
119
120         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
121         if (preproc == null) {
122             // no preprocessor required - just start the operation
123             return startOperationAttempt(controller, 1);
124         }
125
126         /*
127          * Do preprocessor first and then, if successful, start the operation. Note:
128          * operations create their own outcome, ignoring the outcome from any previous
129          * steps.
130          *
131          * Wrap the preprocessor to ensure "stop" is propagated to it.
132          */
133         // @formatter:off
134         controller.wrap(preproc)
135                         .exceptionally(fromException("preprocessor of operation"))
136                         .thenCompose(handlePreprocessorFailure(controller))
137                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
138                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
139         // @formatter:on
140
141         return controller;
142     }
143
144     /**
145      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
146      * invokes the call-backs, marks the controller complete, and returns an incomplete
147      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
148      * received.
149      * <p/>
150      * Assumes that no callbacks have been invoked yet.
151      *
152      * @param controller pipeline controller
153      * @return a function that checks the outcome status and continues, if successful, or
154      *         indicates a failure otherwise
155      */
156     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
157                     PipelineControllerFuture<OperationOutcome> controller) {
158
159         return outcome -> {
160
161             if (outcome != null && isSuccess(outcome)) {
162                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
163                 return CompletableFuture.completedFuture(outcome);
164             }
165
166             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
167
168             final Executor executor = params.getExecutor();
169             final CallbackManager callbacks = new CallbackManager();
170
171             // propagate "stop" to the callbacks
172             controller.add(callbacks);
173
174             final OperationOutcome outcome2 = params.makeOutcome();
175
176             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
177
178             outcome2.setFinalOutcome(true);
179             outcome2.setResult(PolicyResult.FAILURE_GUARD);
180             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
181
182             // @formatter:off
183             CompletableFuture.completedFuture(outcome2)
184                             .whenCompleteAsync(callbackStarted(callbacks), executor)
185                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
186                             .whenCompleteAsync(controller.delayedComplete(), executor);
187             // @formatter:on
188
189             return new CompletableFuture<>();
190         };
191     }
192
193     /**
194      * Invokes the operation's preprocessor step(s) as a "future". This method simply
195      * returns {@code null}.
196      * <p/>
197      * This method assumes the following:
198      * <ul>
199      * <li>the operator is alive</li>
200      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
201      * </ul>
202      *
203      * @return a function that will start the preprocessor and returns its outcome, or
204      *         {@code null} if this operation needs no preprocessor
205      */
206     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
207         return null;
208     }
209
210     /**
211      * Invokes the operation's guard step(s) as a "future".
212      * <p/>
213      * This method assumes the following:
214      * <ul>
215      * <li>the operator is alive</li>
216      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
217      * </ul>
218      *
219      * @return a function that will start the guard checks and returns its outcome, or
220      *         {@code null} if this operation has no guard
221      */
222     protected CompletableFuture<OperationOutcome> startGuardAsync() {
223         // get the guard payload
224         Map<String, Object> guardPayload = makeGuardPayload();
225
226         // wrap it in a "resource"
227         Map<String, Object> resource = new LinkedHashMap<>();
228         resource.put("guard", guardPayload);
229
230         Map<String, Object> payload = new LinkedHashMap<>();
231         payload.put("resource", resource);
232
233         /*
234          * Note: can't use constants from actor.guard, because that would create a
235          * circular dependency.
236          */
237         return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
238                         .payload(payload).build().start();
239     }
240
241     /**
242      * Creates a payload to execute a guard operation.
243      *
244      * @return a new guard payload
245      */
246     protected Map<String, Object> makeGuardPayload() {
247         Map<String, Object> guard = new LinkedHashMap<>();
248         guard.put("actor", params.getActor());
249         guard.put("recipe", params.getOperation());
250         guard.put("target", params.getTargetEntity());
251         guard.put("requestId", params.getRequestId());
252
253         String clname = params.getContext().getEvent().getClosedLoopControlName();
254         if (clname != null) {
255             guard.put("clname", clname);
256         }
257
258         return guard;
259     }
260
261     /**
262      * Starts the operation attempt, with no preprocessor. When all retries complete, it
263      * will complete the controller.
264      *
265      * @param controller controller for all operation attempts
266      * @param attempt attempt number, typically starting with 1
267      * @return a future that will return the final result of all attempts
268      */
269     private CompletableFuture<OperationOutcome> startOperationAttempt(
270                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
271
272         // propagate "stop" to the operation attempt
273         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
274                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
275
276         return controller;
277     }
278
279     /**
280      * Starts the operation attempt, without doing any retries.
281      *
282      * @param params operation parameters
283      * @param attempt attempt number, typically starting with 1
284      * @return a future that will return the result of a single operation attempt
285      */
286     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
287
288         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
289
290         final Executor executor = params.getExecutor();
291         final OperationOutcome outcome = params.makeOutcome();
292         final CallbackManager callbacks = new CallbackManager();
293
294         // this operation attempt gets its own controller
295         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
296
297         // propagate "stop" to the callbacks
298         controller.add(callbacks);
299
300         // @formatter:off
301         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
302                         .whenCompleteAsync(callbackStarted(callbacks), executor)
303                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
304         // @formatter:on
305
306         // handle timeouts, if specified
307         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
308         if (timeoutMillis > 0) {
309             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
310             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
311         }
312
313         /*
314          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
315          * before callbackCompleted() is invoked.
316          *
317          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
318          * the pipeline as the last step anyway.
319          */
320
321         // @formatter:off
322         future.exceptionally(fromException("operation"))
323                     .thenApply(setRetryFlag(attempt))
324                     .whenCompleteAsync(callbackStarted(callbacks), executor)
325                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
326                     .whenCompleteAsync(controller.delayedComplete(), executor);
327         // @formatter:on
328
329         return controller;
330     }
331
332     /**
333      * Determines if the outcome was successful.
334      *
335      * @param outcome outcome to examine
336      * @return {@code true} if the outcome was successful
337      */
338     protected boolean isSuccess(OperationOutcome outcome) {
339         return (outcome.getResult() == PolicyResult.SUCCESS);
340     }
341
342     /**
343      * Determines if the outcome was a failure for this operator.
344      *
345      * @param outcome outcome to examine, or {@code null}
346      * @return {@code true} if the outcome is not {@code null} and was a failure
347      *         <i>and</i> was associated with this operator, {@code false} otherwise
348      */
349     protected boolean isActorFailed(OperationOutcome outcome) {
350         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
351     }
352
353     /**
354      * Determines if the given outcome is for this operation.
355      *
356      * @param outcome outcome to examine
357      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
358      */
359     protected boolean isSameOperation(OperationOutcome outcome) {
360         return OperationOutcome.isFor(outcome, getActorName(), getName());
361     }
362
363     /**
364      * Invokes the operation as a "future". This method simply invokes
365      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
366      * returning the result via a "future".
367      * <p/>
368      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
369      * the executor in the "params", as that may bring the background thread pool to a
370      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
371      * instead.
372      * <p/>
373      * This method assumes the following:
374      * <ul>
375      * <li>the operator is alive</li>
376      * <li>verifyRunning() has been invoked</li>
377      * <li>callbackStarted() has been invoked</li>
378      * <li>the invoker will perform appropriate timeout checks</li>
379      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
380      * </ul>
381      *
382      * @param attempt attempt number, typically starting with 1
383      * @return a function that will start the operation and return its result when
384      *         complete
385      */
386     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
387
388         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
389     }
390
391     /**
392      * Low-level method that performs the operation. This can make the same assumptions
393      * that are made by {@link #doOperationAsFuture()}. This particular method simply
394      * throws an {@link UnsupportedOperationException}.
395      *
396      * @param attempt attempt number, typically starting with 1
397      * @param operation the operation being performed
398      * @return the outcome of the operation
399      */
400     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
401
402         throw new UnsupportedOperationException("start operation " + getFullName());
403     }
404
405     /**
406      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
407      * FAILURE, assuming the policy specifies retries and the retry count has been
408      * exhausted.
409      *
410      * @param attempt latest attempt number, starting with 1
411      * @return a function to get the next future to execute
412      */
413     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
414
415         return origOutcome -> {
416             // ensure we have a non-null outcome
417             OperationOutcome outcome;
418             if (origOutcome != null) {
419                 outcome = origOutcome;
420             } else {
421                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
422                 outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
423             }
424
425             // ensure correct actor/operation
426             outcome.setActor(getActorName());
427             outcome.setOperation(getName());
428
429             // determine if we should retry, based on the result
430             if (outcome.getResult() != PolicyResult.FAILURE) {
431                 // do not retry success or other failure types (e.g., exception)
432                 outcome.setFinalOutcome(true);
433                 return outcome;
434             }
435
436             int retry = getRetry(params.getRetry());
437             if (retry <= 0) {
438                 // no retries were specified
439                 outcome.setFinalOutcome(true);
440
441             } else if (attempt <= retry) {
442                 // have more retries - not the final outcome
443                 outcome.setFinalOutcome(false);
444
445             } else {
446                 /*
447                  * retries were specified and we've already tried them all - change to
448                  * FAILURE_RETRIES
449                  */
450                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
451                 outcome.setResult(PolicyResult.FAILURE_RETRIES);
452                 outcome.setFinalOutcome(true);
453             }
454
455             return outcome;
456         };
457     }
458
459     /**
460      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
461      * was previously invoked, and thus that the "operation" is not {@code null}.
462      *
463      * @param controller controller for all of the retries
464      * @param attempt latest attempt number, starting with 1
465      * @return a function to get the next future to execute
466      */
467     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
468                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
469
470         return operation -> {
471             if (!isActorFailed(operation)) {
472                 // wrong type or wrong operation - just leave it as is
473                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
474                 controller.complete(operation);
475                 return new CompletableFuture<>();
476             }
477
478             if (getRetry(params.getRetry()) <= 0) {
479                 // no retries - already marked as FAILURE, so just return it
480                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
481                 controller.complete(operation);
482                 return new CompletableFuture<>();
483             }
484
485             /*
486              * Retry the operation.
487              */
488             long waitMs = getRetryWaitMs();
489             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
490
491             return sleep(waitMs, TimeUnit.MILLISECONDS)
492                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
493         };
494     }
495
496     /**
497      * Convenience method that starts a sleep(), running via a future.
498      *
499      * @param sleepTime time to sleep
500      * @param unit time unit
501      * @return a future that will complete when the sleep completes
502      */
503     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
504         if (sleepTime <= 0) {
505             return CompletableFuture.completedFuture(null);
506         }
507
508         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
509     }
510
511     /**
512      * Converts an exception into an operation outcome, returning a copy of the outcome to
513      * prevent background jobs from changing it.
514      *
515      * @param type type of item throwing the exception
516      * @return a function that will convert an exception into an operation outcome
517      */
518     private Function<Throwable, OperationOutcome> fromException(String type) {
519
520         return thrown -> {
521             OperationOutcome outcome = params.makeOutcome();
522
523             logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
524                             params.getRequestId(), thrown);
525
526             return setOutcome(outcome, thrown);
527         };
528     }
529
530     /**
531      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
532      * any outstanding futures when one completes.
533      *
534      * @param futureMakers function to make a future. If the function returns
535      *        {@code null}, then no future is created for that function. On the other
536      *        hand, if the function throws an exception, then the previously created
537      *        functions are canceled and the exception is re-thrown
538      * @return a future to cancel or await an outcome, or {@code null} if no futures were
539      *         created. If this future is canceled, then all of the futures will be
540      *         canceled
541      */
542     public CompletableFuture<OperationOutcome> anyOf(
543                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
544
545         return anyOf(Arrays.asList(futureMakers));
546     }
547
548     /**
549      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
550      * any outstanding futures when one completes.
551      *
552      * @param futureMakers function to make a future. If the function returns
553      *        {@code null}, then no future is created for that function. On the other
554      *        hand, if the function throws an exception, then the previously created
555      *        functions are canceled and the exception is re-thrown
556      * @return a future to cancel or await an outcome, or {@code null} if no futures were
557      *         created. If this future is canceled, then all of the futures will be
558      *         canceled. Similarly, when this future completes, any incomplete futures
559      *         will be canceled
560      */
561     public CompletableFuture<OperationOutcome> anyOf(
562                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
563
564         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
565
566         CompletableFuture<OperationOutcome>[] futures =
567                         attachFutures(controller, futureMakers, UnaryOperator.identity());
568
569         if (futures.length == 0) {
570             // no futures were started
571             return null;
572         }
573
574         if (futures.length == 1) {
575             return futures[0];
576         }
577
578         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
579                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
580
581         return controller;
582     }
583
584     /**
585      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
586      *
587      * @param futureMakers function to make a future. If the function returns
588      *        {@code null}, then no future is created for that function. On the other
589      *        hand, if the function throws an exception, then the previously created
590      *        functions are canceled and the exception is re-thrown
591      * @return a future to cancel or await an outcome, or {@code null} if no futures were
592      *         created. If this future is canceled, then all of the futures will be
593      *         canceled
594      */
595     public CompletableFuture<OperationOutcome> allOf(
596                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
597
598         return allOf(Arrays.asList(futureMakers));
599     }
600
601     /**
602      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
603      *
604      * @param futureMakers function to make a future. If the function returns
605      *        {@code null}, then no future is created for that function. On the other
606      *        hand, if the function throws an exception, then the previously created
607      *        functions are canceled and the exception is re-thrown
608      * @return a future to cancel or await an outcome, or {@code null} if no futures were
609      *         created. If this future is canceled, then all of the futures will be
610      *         canceled. Similarly, when this future completes, any incomplete futures
611      *         will be canceled
612      */
613     public CompletableFuture<OperationOutcome> allOf(
614                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
615         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
616
617         Queue<OperationOutcome> outcomes = new LinkedList<>();
618
619         CompletableFuture<OperationOutcome>[] futures =
620                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
621                             synchronized (outcomes) {
622                                 outcomes.add(outcome);
623                             }
624                             return outcome;
625                         }));
626
627         if (futures.length == 0) {
628             // no futures were started
629             return null;
630         }
631
632         if (futures.length == 1) {
633             return futures[0];
634         }
635
636         // @formatter:off
637         CompletableFuture.allOf(futures)
638                         .thenApply(unused -> combineOutcomes(outcomes))
639                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
640         // @formatter:on
641
642         return controller;
643     }
644
645     /**
646      * Invokes the functions to create the futures and attaches them to the controller.
647      *
648      * @param controller master controller for all of the futures
649      * @param futureMakers futures to be attached to the controller
650      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
651      *        Returns the adorned future
652      * @return an array of futures, possibly zero-length. If the array is of size one,
653      *         then that one item should be returned instead of the controller
654      */
655     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
656                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
657                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
658
659         if (futureMakers.isEmpty()) {
660             @SuppressWarnings("unchecked")
661             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
662             return result;
663         }
664
665         // the last, unadorned future that is created
666         CompletableFuture<OperationOutcome> lastFuture = null;
667
668         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
669
670         // make each future
671         for (var maker : futureMakers) {
672             try {
673                 CompletableFuture<OperationOutcome> future = maker.get();
674                 if (future == null) {
675                     continue;
676                 }
677
678                 // propagate "stop" to the future
679                 controller.add(future);
680
681                 futures.add(adorn.apply(future));
682
683                 lastFuture = future;
684
685             } catch (RuntimeException e) {
686                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
687                 controller.cancel(false);
688                 throw e;
689             }
690         }
691
692         @SuppressWarnings("unchecked")
693         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
694
695         if (result.length == 1) {
696             // special case - return the unadorned future
697             result[0] = lastFuture;
698             return result;
699         }
700
701         return futures.toArray(result);
702     }
703
704     /**
705      * Combines the outcomes from a set of tasks.
706      *
707      * @param outcomes outcomes to be examined
708      * @return the combined outcome
709      */
710     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
711
712         // identify the outcome with the highest priority
713         OperationOutcome outcome = outcomes.remove();
714         int priority = detmPriority(outcome);
715
716         for (OperationOutcome outcome2 : outcomes) {
717             int priority2 = detmPriority(outcome2);
718
719             if (priority2 > priority) {
720                 outcome = outcome2;
721                 priority = priority2;
722             }
723         }
724
725         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
726                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
727
728         return outcome;
729     }
730
731     /**
732      * Determines the priority of an outcome based on its result.
733      *
734      * @param outcome outcome to examine, or {@code null}
735      * @return the outcome's priority
736      */
737     protected int detmPriority(OperationOutcome outcome) {
738         if (outcome == null || outcome.getResult() == null) {
739             return 1;
740         }
741
742         switch (outcome.getResult()) {
743             case SUCCESS:
744                 return 0;
745
746             case FAILURE_GUARD:
747                 return 2;
748
749             case FAILURE_RETRIES:
750                 return 3;
751
752             case FAILURE:
753                 return 4;
754
755             case FAILURE_TIMEOUT:
756                 return 5;
757
758             case FAILURE_EXCEPTION:
759             default:
760                 return 6;
761         }
762     }
763
764     /**
765      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
766      * not created until the previous task completes. The pipeline returns the outcome of
767      * the last task executed.
768      *
769      * @param futureMakers functions to make the futures
770      * @return a future to cancel the sequence or await the outcome
771      */
772     public CompletableFuture<OperationOutcome> sequence(
773                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
774
775         return sequence(Arrays.asList(futureMakers));
776     }
777
778     /**
779      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
780      * not created until the previous task completes. The pipeline returns the outcome of
781      * the last task executed.
782      *
783      * @param futureMakers functions to make the futures
784      * @return a future to cancel the sequence or await the outcome, or {@code null} if
785      *         there were no tasks to perform
786      */
787     public CompletableFuture<OperationOutcome> sequence(
788                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
789
790         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
791
792         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
793         if (nextTask == null) {
794             // no tasks
795             return null;
796         }
797
798         if (queue.isEmpty()) {
799             // only one task - just return it rather than wrapping it in a controller
800             return nextTask;
801         }
802
803         /*
804          * multiple tasks - need a controller to stop whichever task is currently
805          * executing
806          */
807         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
808         final Executor executor = params.getExecutor();
809
810         // @formatter:off
811         controller.wrap(nextTask)
812                     .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
813                     .whenCompleteAsync(controller.delayedComplete(), executor);
814         // @formatter:on
815
816         return controller;
817     }
818
819     /**
820      * Executes the next task in the queue, if the previous outcome was successful.
821      *
822      * @param controller pipeline controller
823      * @param taskQueue queue of tasks to be performed
824      * @return a future to execute the remaining tasks, or the current outcome, if it's a
825      *         failure, or if there are no more tasks
826      */
827     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
828                     PipelineControllerFuture<OperationOutcome> controller,
829                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
830
831         return outcome -> {
832             if (!isSuccess(outcome)) {
833                 // return the failure
834                 return CompletableFuture.completedFuture(outcome);
835             }
836
837             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
838             if (nextTask == null) {
839                 // no tasks - just return the success
840                 return CompletableFuture.completedFuture(outcome);
841             }
842
843             // @formatter:off
844             return controller
845                         .wrap(nextTask)
846                         .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
847             // @formatter:on
848         };
849     }
850
851     /**
852      * Gets the next task from the queue, skipping those that are {@code null}.
853      *
854      * @param taskQueue task queue
855      * @return the next task, or {@code null} if the queue is now empty
856      */
857     private CompletableFuture<OperationOutcome> getNextTask(
858                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
859
860         Supplier<CompletableFuture<OperationOutcome>> maker;
861
862         while ((maker = taskQueue.poll()) != null) {
863             CompletableFuture<OperationOutcome> future = maker.get();
864             if (future != null) {
865                 return future;
866             }
867         }
868
869         return null;
870     }
871
872     /**
873      * Sets the start time of the operation and invokes the callback to indicate that the
874      * operation has started. Does nothing if the pipeline has been stopped.
875      * <p/>
876      * This assumes that the "outcome" is not {@code null}.
877      *
878      * @param callbacks used to determine if the start callback can be invoked
879      * @return a function that sets the start time and invokes the callback
880      */
881     private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
882
883         return (outcome, thrown) -> {
884
885             if (callbacks.canStart()) {
886                 outcome.setStart(callbacks.getStartTime());
887                 outcome.setEnd(null);
888
889                 // pass a copy to the callback
890                 OperationOutcome outcome2 = new OperationOutcome(outcome);
891                 outcome2.setFinalOutcome(false);
892                 params.callbackStarted(outcome2);
893             }
894         };
895     }
896
897     /**
898      * Sets the end time of the operation and invokes the callback to indicate that the
899      * operation has completed. Does nothing if the pipeline has been stopped.
900      * <p/>
901      * This assumes that the "outcome" is not {@code null}.
902      * <p/>
903      * Note: the start time must be a reference rather than a plain value, because it's
904      * value must be gotten on-demand, when the returned function is executed at a later
905      * time.
906      *
907      * @param callbacks used to determine if the end callback can be invoked
908      * @return a function that sets the end time and invokes the callback
909      */
910     private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
911
912         return (outcome, thrown) -> {
913             if (callbacks.canEnd()) {
914                 outcome.setStart(callbacks.getStartTime());
915                 outcome.setEnd(callbacks.getEndTime());
916
917                 // pass a copy to the callback
918                 params.callbackCompleted(new OperationOutcome(outcome));
919             }
920         };
921     }
922
923     /**
924      * Sets an operation's outcome and message, based on a throwable.
925      *
926      * @param operation operation to be updated
927      * @return the updated operation
928      */
929     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
930         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
931         return setOutcome(operation, result);
932     }
933
934     /**
935      * Sets an operation's outcome and default message based on the result.
936      *
937      * @param operation operation to be updated
938      * @param result result of the operation
939      * @return the updated operation
940      */
941     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
942         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
943         operation.setResult(result);
944         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
945                         : ControlLoopOperation.FAILED_MSG);
946
947         return operation;
948     }
949
950     /**
951      * Determines if a throwable is due to a timeout.
952      *
953      * @param thrown throwable of interest
954      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
955      */
956     protected boolean isTimeout(Throwable thrown) {
957         if (thrown instanceof CompletionException) {
958             thrown = thrown.getCause();
959         }
960
961         return (thrown instanceof TimeoutException);
962     }
963
964     /**
965      * Logs a response. If the response is not of type, String, then it attempts to
966      * pretty-print it into JSON before logging.
967      *
968      * @param direction IN or OUT
969      * @param infra communication infrastructure on which it was published
970      * @param source source name (e.g., the URL or Topic name)
971      * @param response response to be logged
972      * @return the JSON text that was logged
973      */
974     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
975         String json;
976         try {
977             if (response == null) {
978                 json = null;
979             } else if (response instanceof String) {
980                 json = response.toString();
981             } else {
982                 json = makeCoder().encode(response, true);
983             }
984
985         } catch (CoderException e) {
986             String type = (direction == EventType.IN ? "response" : "request");
987             logger.warn("cannot pretty-print {}", type, e);
988             json = response.toString();
989         }
990
991         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
992
993         return json;
994     }
995
996     // these may be overridden by subclasses or junit tests
997
998     /**
999      * Gets the retry count.
1000      *
1001      * @param retry retry, extracted from the parameters, or {@code null}
1002      * @return the number of retries, or {@code 0} if no retries were specified
1003      */
1004     protected int getRetry(Integer retry) {
1005         return (retry == null ? 0 : retry);
1006     }
1007
1008     /**
1009      * Gets the retry wait, in milliseconds.
1010      *
1011      * @return the retry wait, in milliseconds
1012      */
1013     protected long getRetryWaitMs() {
1014         return DEFAULT_RETRY_WAIT_MS;
1015     }
1016
1017     /**
1018      * Gets the operation timeout.
1019      *
1020      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1021      *        {@code null}
1022      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1023      *         specified
1024      */
1025     protected long getTimeoutMs(Integer timeoutSec) {
1026         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1027     }
1028
1029     // these may be overridden by junit tests
1030
1031     protected Coder makeCoder() {
1032         return coder;
1033     }
1034 }