Change payload to Map<String,Object> so it's more versatile
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedHashMap;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.concurrent.CompletableFuture;
32 import java.util.concurrent.CompletionException;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import java.util.function.BiConsumer;
37 import java.util.function.Function;
38 import java.util.function.Supplier;
39 import java.util.function.UnaryOperator;
40 import lombok.Getter;
41 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
42 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
43 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
44 import org.onap.policy.common.utils.coder.Coder;
45 import org.onap.policy.common.utils.coder.CoderException;
46 import org.onap.policy.common.utils.coder.StandardCoder;
47 import org.onap.policy.controlloop.ControlLoopOperation;
48 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
49 import org.onap.policy.controlloop.actorserviceprovider.Operation;
50 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
51 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
52 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
53 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
54 import org.onap.policy.controlloop.policy.PolicyResult;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * Partial implementation of an operator. In general, it's preferable that subclasses
60  * would override {@link #startOperationAsync(int, OperationOutcome)
61  * startOperationAsync()}. However, if that proves to be too difficult, then they can
62  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
63  * if the operation requires any preprocessor steps, the subclass may choose to override
64  * {@link #startPreprocessorAsync()}.
65  * <p/>
66  * The futures returned by the methods within this class can be canceled, and will
67  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
68  * returned by overridden methods will do the same. Of course, if a class overrides
69  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
70  * be done to cancel that particular operation.
71  */
72 public abstract class OperationPartial implements Operation {
73     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
74     private static final Coder coder = new StandardCoder();
75
76     public static final String GUARD_ACTOR_NAME = "GUARD";
77     public static final String GUARD_OPERATION_NAME = "Decision";
78     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
79
80     private final OperatorConfig config;
81
82     /**
83      * Operation parameters.
84      */
85     protected final ControlLoopOperationParams params;
86
87     @Getter
88     private final String fullName;
89
90
91     /**
92      * Constructs the object.
93      *
94      * @param params operation parameters
95      * @param config configuration for this operation
96      */
97     public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
98         this.params = params;
99         this.config = config;
100         this.fullName = params.getActor() + "." + params.getOperation();
101     }
102
103     public Executor getBlockingExecutor() {
104         return config.getBlockingExecutor();
105     }
106
107     public String getActorName() {
108         return params.getActor();
109     }
110
111     public String getName() {
112         return params.getOperation();
113     }
114
115     @Override
116     public final CompletableFuture<OperationOutcome> start() {
117         // allocate a controller for the entire operation
118         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
119
120         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
121         if (preproc == null) {
122             // no preprocessor required - just start the operation
123             return startOperationAttempt(controller, 1);
124         }
125
126         /*
127          * Do preprocessor first and then, if successful, start the operation. Note:
128          * operations create their own outcome, ignoring the outcome from any previous
129          * steps.
130          *
131          * Wrap the preprocessor to ensure "stop" is propagated to it.
132          */
133         // @formatter:off
134         controller.wrap(preproc)
135                         .exceptionally(fromException("preprocessor of operation"))
136                         .thenCompose(handlePreprocessorFailure(controller))
137                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
138                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
139         // @formatter:on
140
141         return controller;
142     }
143
144     /**
145      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
146      * invokes the call-backs, marks the controller complete, and returns an incomplete
147      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
148      * received.
149      * <p/>
150      * Assumes that no callbacks have been invoked yet.
151      *
152      * @param controller pipeline controller
153      * @return a function that checks the outcome status and continues, if successful, or
154      *         indicates a failure otherwise
155      */
156     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
157                     PipelineControllerFuture<OperationOutcome> controller) {
158
159         return outcome -> {
160
161             if (outcome != null && isSuccess(outcome)) {
162                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
163                 return CompletableFuture.completedFuture(outcome);
164             }
165
166             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
167
168             final Executor executor = params.getExecutor();
169             final CallbackManager callbacks = new CallbackManager();
170
171             // propagate "stop" to the callbacks
172             controller.add(callbacks);
173
174             final OperationOutcome outcome2 = params.makeOutcome();
175
176             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
177
178             outcome2.setResult(PolicyResult.FAILURE_GUARD);
179             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
180
181             // @formatter:off
182             CompletableFuture.completedFuture(outcome2)
183                             .whenCompleteAsync(callbackStarted(callbacks), executor)
184                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
185                             .whenCompleteAsync(controller.delayedComplete(), executor);
186             // @formatter:on
187
188             return new CompletableFuture<>();
189         };
190     }
191
192     /**
193      * Invokes the operation's preprocessor step(s) as a "future". This method simply
194      * returns {@code null}.
195      * <p/>
196      * This method assumes the following:
197      * <ul>
198      * <li>the operator is alive</li>
199      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
200      * </ul>
201      *
202      * @return a function that will start the preprocessor and returns its outcome, or
203      *         {@code null} if this operation needs no preprocessor
204      */
205     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
206         return null;
207     }
208
209     /**
210      * Invokes the operation's guard step(s) as a "future".
211      * <p/>
212      * This method assumes the following:
213      * <ul>
214      * <li>the operator is alive</li>
215      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
216      * </ul>
217      *
218      * @return a function that will start the guard checks and returns its outcome, or
219      *         {@code null} if this operation has no guard
220      */
221     protected CompletableFuture<OperationOutcome> startGuardAsync() {
222         // get the guard payload
223         Map<String,Object> guardPayload = makeGuardPayload();
224
225         // wrap it in a "resource"
226         Map<String,Object> resource = new LinkedHashMap<>();
227         resource.put("guard", guardPayload);
228
229         Map<String,Object> payload = new LinkedHashMap<>();
230         payload.put("resource", resource);
231
232         /*
233          * Note: can't use constants from actor.guard, because that would create a
234          * circular dependency.
235          */
236         return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
237                         .payload(payload).build().start();
238     }
239
240     /**
241      * Creates a payload to execute a guard operation.
242      *
243      * @return a new guard payload
244      */
245     protected Map<String, Object> makeGuardPayload() {
246         Map<String, Object> guard = new LinkedHashMap<>();
247         guard.put("actor", params.getActor());
248         guard.put("recipe", params.getOperation());
249         guard.put("target", params.getTargetEntity());
250         guard.put("requestId", params.getRequestId());
251
252         String clname = params.getContext().getEvent().getClosedLoopControlName();
253         if (clname != null) {
254             guard.put("clname", clname);
255         }
256
257         return guard;
258     }
259
260     /**
261      * Starts the operation attempt, with no preprocessor. When all retries complete, it
262      * will complete the controller.
263      *
264      * @param controller controller for all operation attempts
265      * @param attempt attempt number, typically starting with 1
266      * @return a future that will return the final result of all attempts
267      */
268     private CompletableFuture<OperationOutcome> startOperationAttempt(
269                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
270
271         // propagate "stop" to the operation attempt
272         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
273                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
274
275         return controller;
276     }
277
278     /**
279      * Starts the operation attempt, without doing any retries.
280      *
281      * @param params operation parameters
282      * @param attempt attempt number, typically starting with 1
283      * @return a future that will return the result of a single operation attempt
284      */
285     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
286
287         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
288
289         final Executor executor = params.getExecutor();
290         final OperationOutcome outcome = params.makeOutcome();
291         final CallbackManager callbacks = new CallbackManager();
292
293         // this operation attempt gets its own controller
294         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
295
296         // propagate "stop" to the callbacks
297         controller.add(callbacks);
298
299         // @formatter:off
300         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
301                         .whenCompleteAsync(callbackStarted(callbacks), executor)
302                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
303         // @formatter:on
304
305         // handle timeouts, if specified
306         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
307         if (timeoutMillis > 0) {
308             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
309             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
310         }
311
312         /*
313          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
314          * before callbackCompleted() is invoked.
315          *
316          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
317          * the pipeline as the last step anyway.
318          */
319
320         // @formatter:off
321         future.exceptionally(fromException("operation"))
322                     .thenApply(setRetryFlag(attempt))
323                     .whenCompleteAsync(callbackStarted(callbacks), executor)
324                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
325                     .whenCompleteAsync(controller.delayedComplete(), executor);
326         // @formatter:on
327
328         return controller;
329     }
330
331     /**
332      * Determines if the outcome was successful.
333      *
334      * @param outcome outcome to examine
335      * @return {@code true} if the outcome was successful
336      */
337     protected boolean isSuccess(OperationOutcome outcome) {
338         return (outcome.getResult() == PolicyResult.SUCCESS);
339     }
340
341     /**
342      * Determines if the outcome was a failure for this operator.
343      *
344      * @param outcome outcome to examine, or {@code null}
345      * @return {@code true} if the outcome is not {@code null} and was a failure
346      *         <i>and</i> was associated with this operator, {@code false} otherwise
347      */
348     protected boolean isActorFailed(OperationOutcome outcome) {
349         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
350     }
351
352     /**
353      * Determines if the given outcome is for this operation.
354      *
355      * @param outcome outcome to examine
356      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
357      */
358     protected boolean isSameOperation(OperationOutcome outcome) {
359         return OperationOutcome.isFor(outcome, getActorName(), getName());
360     }
361
362     /**
363      * Invokes the operation as a "future". This method simply invokes
364      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
365      * returning the result via a "future".
366      * <p/>
367      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
368      * the executor in the "params", as that may bring the background thread pool to a
369      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
370      * instead.
371      * <p/>
372      * This method assumes the following:
373      * <ul>
374      * <li>the operator is alive</li>
375      * <li>verifyRunning() has been invoked</li>
376      * <li>callbackStarted() has been invoked</li>
377      * <li>the invoker will perform appropriate timeout checks</li>
378      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
379      * </ul>
380      *
381      * @param attempt attempt number, typically starting with 1
382      * @return a function that will start the operation and return its result when
383      *         complete
384      */
385     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
386
387         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
388     }
389
390     /**
391      * Low-level method that performs the operation. This can make the same assumptions
392      * that are made by {@link #doOperationAsFuture()}. This particular method simply
393      * throws an {@link UnsupportedOperationException}.
394      *
395      * @param attempt attempt number, typically starting with 1
396      * @param operation the operation being performed
397      * @return the outcome of the operation
398      */
399     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
400
401         throw new UnsupportedOperationException("start operation " + getFullName());
402     }
403
404     /**
405      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
406      * FAILURE, assuming the policy specifies retries and the retry count has been
407      * exhausted.
408      *
409      * @param attempt latest attempt number, starting with 1
410      * @return a function to get the next future to execute
411      */
412     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
413
414         return operation -> {
415             if (operation != null && !isActorFailed(operation)) {
416                 /*
417                  * wrong type or wrong operation - just leave it as is. No need to log
418                  * anything here, as retryOnFailure() will log a message
419                  */
420                 return operation;
421             }
422
423             // get a non-null operation
424             OperationOutcome oper2;
425             if (operation != null) {
426                 oper2 = operation;
427             } else {
428                 oper2 = params.makeOutcome();
429                 oper2.setResult(PolicyResult.FAILURE);
430             }
431
432             int retry = getRetry(params.getRetry());
433             if (retry > 0 && attempt > retry) {
434                 /*
435                  * retries were specified and we've already tried them all - change to
436                  * FAILURE_RETRIES
437                  */
438                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
439                 oper2.setResult(PolicyResult.FAILURE_RETRIES);
440             }
441
442             return oper2;
443         };
444     }
445
446     /**
447      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
448      * was previously invoked, and thus that the "operation" is not {@code null}.
449      *
450      * @param controller controller for all of the retries
451      * @param attempt latest attempt number, starting with 1
452      * @return a function to get the next future to execute
453      */
454     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
455                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
456
457         return operation -> {
458             if (!isActorFailed(operation)) {
459                 // wrong type or wrong operation - just leave it as is
460                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
461                 controller.complete(operation);
462                 return new CompletableFuture<>();
463             }
464
465             if (getRetry(params.getRetry()) <= 0) {
466                 // no retries - already marked as FAILURE, so just return it
467                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
468                 controller.complete(operation);
469                 return new CompletableFuture<>();
470             }
471
472             /*
473              * Retry the operation.
474              */
475             long waitMs = getRetryWaitMs();
476             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
477
478             return sleep(waitMs, TimeUnit.MILLISECONDS)
479                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
480         };
481     }
482
483     /**
484      * Convenience method that starts a sleep(), running via a future.
485      *
486      * @param sleepTime time to sleep
487      * @param unit time unit
488      * @return a future that will complete when the sleep completes
489      */
490     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
491         if (sleepTime <= 0) {
492             return CompletableFuture.completedFuture(null);
493         }
494
495         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
496     }
497
498     /**
499      * Converts an exception into an operation outcome, returning a copy of the outcome to
500      * prevent background jobs from changing it.
501      *
502      * @param type type of item throwing the exception
503      * @return a function that will convert an exception into an operation outcome
504      */
505     private Function<Throwable, OperationOutcome> fromException(String type) {
506
507         return thrown -> {
508             OperationOutcome outcome = params.makeOutcome();
509
510             logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
511                             params.getRequestId(), thrown);
512
513             return setOutcome(outcome, thrown);
514         };
515     }
516
517     /**
518      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
519      * any outstanding futures when one completes.
520      *
521      * @param futureMakers function to make a future. If the function returns
522      *        {@code null}, then no future is created for that function. On the other
523      *        hand, if the function throws an exception, then the previously created
524      *        functions are canceled and the exception is re-thrown
525      * @return a future to cancel or await an outcome, or {@code null} if no futures were
526      *         created. If this future is canceled, then all of the futures will be
527      *         canceled
528      */
529     protected CompletableFuture<OperationOutcome> anyOf(
530                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
531
532         return anyOf(Arrays.asList(futureMakers));
533     }
534
535     /**
536      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
537      * any outstanding futures when one completes.
538      *
539      * @param futureMakers function to make a future. If the function returns
540      *        {@code null}, then no future is created for that function. On the other
541      *        hand, if the function throws an exception, then the previously created
542      *        functions are canceled and the exception is re-thrown
543      * @return a future to cancel or await an outcome, or {@code null} if no futures were
544      *         created. If this future is canceled, then all of the futures will be
545      *         canceled. Similarly, when this future completes, any incomplete futures
546      *         will be canceled
547      */
548     protected CompletableFuture<OperationOutcome> anyOf(
549                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
550
551         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
552
553         CompletableFuture<OperationOutcome>[] futures =
554                         attachFutures(controller, futureMakers, UnaryOperator.identity());
555
556         if (futures.length == 0) {
557             // no futures were started
558             return null;
559         }
560
561         if (futures.length == 1) {
562             return futures[0];
563         }
564
565         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
566                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
567
568         return controller;
569     }
570
571     /**
572      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
573      *
574      * @param futureMakers function to make a future. If the function returns
575      *        {@code null}, then no future is created for that function. On the other
576      *        hand, if the function throws an exception, then the previously created
577      *        functions are canceled and the exception is re-thrown
578      * @return a future to cancel or await an outcome, or {@code null} if no futures were
579      *         created. If this future is canceled, then all of the futures will be
580      *         canceled
581      */
582     protected CompletableFuture<OperationOutcome> allOf(
583                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
584
585         return allOf(Arrays.asList(futureMakers));
586     }
587
588     /**
589      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
590      *
591      * @param futureMakers function to make a future. If the function returns
592      *        {@code null}, then no future is created for that function. On the other
593      *        hand, if the function throws an exception, then the previously created
594      *        functions are canceled and the exception is re-thrown
595      * @return a future to cancel or await an outcome, or {@code null} if no futures were
596      *         created. If this future is canceled, then all of the futures will be
597      *         canceled. Similarly, when this future completes, any incomplete futures
598      *         will be canceled
599      */
600     protected CompletableFuture<OperationOutcome> allOf(
601                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
602         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
603
604         Queue<OperationOutcome> outcomes = new LinkedList<>();
605
606         CompletableFuture<OperationOutcome>[] futures =
607                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
608                             synchronized (outcomes) {
609                                 outcomes.add(outcome);
610                             }
611                             return outcome;
612                         }));
613
614         if (futures.length == 0) {
615             // no futures were started
616             return null;
617         }
618
619         if (futures.length == 1) {
620             return futures[0];
621         }
622
623         // @formatter:off
624         CompletableFuture.allOf(futures)
625                         .thenApply(unused -> combineOutcomes(outcomes))
626                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
627         // @formatter:on
628
629         return controller;
630     }
631
632     /**
633      * Invokes the functions to create the futures and attaches them to the controller.
634      *
635      * @param controller master controller for all of the futures
636      * @param futureMakers futures to be attached to the controller
637      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
638      *        Returns the adorned future
639      * @return an array of futures, possibly zero-length. If the array is of size one,
640      *         then that one item should be returned instead of the controller
641      */
642     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
643                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
644                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
645
646         if (futureMakers.isEmpty()) {
647             @SuppressWarnings("unchecked")
648             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
649             return result;
650         }
651
652         // the last, unadorned future that is created
653         CompletableFuture<OperationOutcome> lastFuture = null;
654
655         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
656
657         // make each future
658         for (var maker : futureMakers) {
659             try {
660                 CompletableFuture<OperationOutcome> future = maker.get();
661                 if (future == null) {
662                     continue;
663                 }
664
665                 // propagate "stop" to the future
666                 controller.add(future);
667
668                 futures.add(adorn.apply(future));
669
670                 lastFuture = future;
671
672             } catch (RuntimeException e) {
673                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
674                 controller.cancel(false);
675                 throw e;
676             }
677         }
678
679         @SuppressWarnings("unchecked")
680         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
681
682         if (result.length == 1) {
683             // special case - return the unadorned future
684             result[0] = lastFuture;
685             return result;
686         }
687
688         return futures.toArray(result);
689     }
690
691     /**
692      * Combines the outcomes from a set of tasks.
693      *
694      * @param outcomes outcomes to be examined
695      * @return the combined outcome
696      */
697     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
698
699         // identify the outcome with the highest priority
700         OperationOutcome outcome = outcomes.remove();
701         int priority = detmPriority(outcome);
702
703         for (OperationOutcome outcome2 : outcomes) {
704             int priority2 = detmPriority(outcome2);
705
706             if (priority2 > priority) {
707                 outcome = outcome2;
708                 priority = priority2;
709             }
710         }
711
712         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
713                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
714
715         return outcome;
716     }
717
718     /**
719      * Determines the priority of an outcome based on its result.
720      *
721      * @param outcome outcome to examine, or {@code null}
722      * @return the outcome's priority
723      */
724     protected int detmPriority(OperationOutcome outcome) {
725         if (outcome == null || outcome.getResult() == null) {
726             return 1;
727         }
728
729         switch (outcome.getResult()) {
730             case SUCCESS:
731                 return 0;
732
733             case FAILURE_GUARD:
734                 return 2;
735
736             case FAILURE_RETRIES:
737                 return 3;
738
739             case FAILURE:
740                 return 4;
741
742             case FAILURE_TIMEOUT:
743                 return 5;
744
745             case FAILURE_EXCEPTION:
746             default:
747                 return 6;
748         }
749     }
750
751     /**
752      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
753      * not created until the previous task completes. The pipeline returns the outcome of
754      * the last task executed.
755      *
756      * @param futureMakers functions to make the futures
757      * @return a future to cancel the sequence or await the outcome
758      */
759     protected CompletableFuture<OperationOutcome> sequence(
760                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
761
762         return sequence(Arrays.asList(futureMakers));
763     }
764
765     /**
766      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
767      * not created until the previous task completes. The pipeline returns the outcome of
768      * the last task executed.
769      *
770      * @param futureMakers functions to make the futures
771      * @return a future to cancel the sequence or await the outcome, or {@code null} if
772      *         there were no tasks to perform
773      */
774     protected CompletableFuture<OperationOutcome> sequence(
775                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
776
777         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
778
779         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
780         if (nextTask == null) {
781             // no tasks
782             return null;
783         }
784
785         if (queue.isEmpty()) {
786             // only one task - just return it rather than wrapping it in a controller
787             return nextTask;
788         }
789
790         /*
791          * multiple tasks - need a controller to stop whichever task is currently
792          * executing
793          */
794         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
795         final Executor executor = params.getExecutor();
796
797         // @formatter:off
798         controller.wrap(nextTask)
799                     .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
800                     .whenCompleteAsync(controller.delayedComplete(), executor);
801         // @formatter:on
802
803         return controller;
804     }
805
806     /**
807      * Executes the next task in the queue, if the previous outcome was successful.
808      *
809      * @param controller pipeline controller
810      * @param taskQueue queue of tasks to be performed
811      * @return a future to execute the remaining tasks, or the current outcome, if it's a
812      *         failure, or if there are no more tasks
813      */
814     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
815                     PipelineControllerFuture<OperationOutcome> controller,
816                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
817
818         return outcome -> {
819             if (!isSuccess(outcome)) {
820                 // return the failure
821                 return CompletableFuture.completedFuture(outcome);
822             }
823
824             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
825             if (nextTask == null) {
826                 // no tasks - just return the success
827                 return CompletableFuture.completedFuture(outcome);
828             }
829
830             // @formatter:off
831             return controller
832                         .wrap(nextTask)
833                         .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
834             // @formatter:on
835         };
836     }
837
838     /**
839      * Gets the next task from the queue, skipping those that are {@code null}.
840      *
841      * @param taskQueue task queue
842      * @return the next task, or {@code null} if the queue is now empty
843      */
844     private CompletableFuture<OperationOutcome> getNextTask(
845                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
846
847         Supplier<CompletableFuture<OperationOutcome>> maker;
848
849         while ((maker = taskQueue.poll()) != null) {
850             CompletableFuture<OperationOutcome> future = maker.get();
851             if (future != null) {
852                 return future;
853             }
854         }
855
856         return null;
857     }
858
859     /**
860      * Sets the start time of the operation and invokes the callback to indicate that the
861      * operation has started. Does nothing if the pipeline has been stopped.
862      * <p/>
863      * This assumes that the "outcome" is not {@code null}.
864      *
865      * @param callbacks used to determine if the start callback can be invoked
866      * @return a function that sets the start time and invokes the callback
867      */
868     private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
869
870         return (outcome, thrown) -> {
871
872             if (callbacks.canStart()) {
873                 // haven't invoked "start" callback yet
874                 outcome.setStart(callbacks.getStartTime());
875                 outcome.setEnd(null);
876                 params.callbackStarted(outcome);
877             }
878         };
879     }
880
881     /**
882      * Sets the end time of the operation and invokes the callback to indicate that the
883      * operation has completed. Does nothing if the pipeline has been stopped.
884      * <p/>
885      * This assumes that the "outcome" is not {@code null}.
886      * <p/>
887      * Note: the start time must be a reference rather than a plain value, because it's
888      * value must be gotten on-demand, when the returned function is executed at a later
889      * time.
890      *
891      * @param callbacks used to determine if the end callback can be invoked
892      * @return a function that sets the end time and invokes the callback
893      */
894     private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
895
896         return (outcome, thrown) -> {
897
898             if (callbacks.canEnd()) {
899                 outcome.setStart(callbacks.getStartTime());
900                 outcome.setEnd(callbacks.getEndTime());
901                 params.callbackCompleted(outcome);
902             }
903         };
904     }
905
906     /**
907      * Sets an operation's outcome and message, based on a throwable.
908      *
909      * @param operation operation to be updated
910      * @return the updated operation
911      */
912     protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
913         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
914         return setOutcome(operation, result);
915     }
916
917     /**
918      * Sets an operation's outcome and default message based on the result.
919      *
920      * @param operation operation to be updated
921      * @param result result of the operation
922      * @return the updated operation
923      */
924     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
925         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
926         operation.setResult(result);
927         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
928                         : ControlLoopOperation.FAILED_MSG);
929
930         return operation;
931     }
932
933     /**
934      * Determines if a throwable is due to a timeout.
935      *
936      * @param thrown throwable of interest
937      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
938      */
939     protected boolean isTimeout(Throwable thrown) {
940         if (thrown instanceof CompletionException) {
941             thrown = thrown.getCause();
942         }
943
944         return (thrown instanceof TimeoutException);
945     }
946
947     /**
948      * Logs a response. If the response is not of type, String, then it attempts to
949      * pretty-print it into JSON before logging.
950      *
951      * @param direction IN or OUT
952      * @param infra communication infrastructure on which it was published
953      * @param source source name (e.g., the URL or Topic name)
954      * @param response response to be logged
955      * @return the JSON text that was logged
956      */
957     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
958         String json;
959         try {
960             if (response == null) {
961                 json = null;
962             } else if (response instanceof String) {
963                 json = response.toString();
964             } else {
965                 json = makeCoder().encode(response, true);
966             }
967
968         } catch (CoderException e) {
969             String type = (direction == EventType.IN ? "response" : "request");
970             logger.warn("cannot pretty-print {}", type, e);
971             json = response.toString();
972         }
973
974         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
975
976         return json;
977     }
978
979     // these may be overridden by subclasses or junit tests
980
981     /**
982      * Gets the retry count.
983      *
984      * @param retry retry, extracted from the parameters, or {@code null}
985      * @return the number of retries, or {@code 0} if no retries were specified
986      */
987     protected int getRetry(Integer retry) {
988         return (retry == null ? 0 : retry);
989     }
990
991     /**
992      * Gets the retry wait, in milliseconds.
993      *
994      * @return the retry wait, in milliseconds
995      */
996     protected long getRetryWaitMs() {
997         return DEFAULT_RETRY_WAIT_MS;
998     }
999
1000     /**
1001      * Gets the operation timeout.
1002      *
1003      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1004      *        {@code null}
1005      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1006      *         specified
1007      */
1008     protected long getTimeoutMs(Integer timeoutSec) {
1009         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1010     }
1011
1012     // these may be overridden by junit tests
1013
1014     protected Coder makeCoder() {
1015         return coder;
1016     }
1017 }