Merge "Modify Actors to use properties when provided"
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedHashMap;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Queue;
32 import java.util.UUID;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import java.util.function.BiConsumer;
40 import java.util.function.Function;
41 import java.util.function.Supplier;
42 import java.util.function.UnaryOperator;
43 import lombok.AccessLevel;
44 import lombok.Getter;
45 import lombok.Setter;
46 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
48 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
49 import org.onap.policy.common.utils.coder.Coder;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.controlloop.ControlLoopOperation;
53 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
54 import org.onap.policy.controlloop.actorserviceprovider.Operation;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
56 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
58 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
59 import org.onap.policy.controlloop.policy.PolicyResult;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * Partial implementation of an operator. In general, it's preferable that subclasses
65  * would override {@link #startOperationAsync(int, OperationOutcome)
66  * startOperationAsync()}. However, if that proves to be too difficult, then they can
67  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
68  * if the operation requires any preprocessor steps, the subclass may choose to override
69  * {@link #startPreprocessorAsync()}.
70  * <p/>
71  * The futures returned by the methods within this class can be canceled, and will
72  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
73  * returned by overridden methods will do the same. Of course, if a class overrides
74  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
75  * be done to cancel that particular operation.
76  * <p/>
77  * In general tasks in a pipeline are executed by the same thread. However, the following
78  * should always be executed via the executor specified in "params":
79  * <ul>
80  * <li>start callback</li>
81  * <li>completion callback</li>
82  * <li>controller completion (i.e., delayedComplete())</li>
83  * </ul>
84  */
85 public abstract class OperationPartial implements Operation {
86     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
87     private static final Coder coder = new StandardCoder();
88
89     public static final String GUARD_ACTOR_NAME = "GUARD";
90     public static final String GUARD_OPERATION_NAME = "Decision";
91     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
92
93     private final OperatorConfig config;
94
95     /**
96      * Operation parameters.
97      */
98     protected final ControlLoopOperationParams params;
99
100     @Getter
101     private final String fullName;
102
103     @Getter
104     @Setter(AccessLevel.PROTECTED)
105     private String subRequestId;
106
107     @Getter
108     private final List<String> propertyNames;
109
110     /**
111      * Values for the properties identified by {@link #getPropertyNames()}.
112      */
113     private final Map<String, Object> properties = new HashMap<>();
114
115
116     /**
117      * Constructs the object.
118      *
119      * @param params operation parameters
120      * @param config configuration for this operation
121      * @param propertyNames names of properties required by this operation
122      */
123     public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
124         this.params = params;
125         this.config = config;
126         this.fullName = params.getActor() + "." + params.getOperation();
127         this.propertyNames = propertyNames;
128     }
129
130     public Executor getBlockingExecutor() {
131         return config.getBlockingExecutor();
132     }
133
134     public String getActorName() {
135         return params.getActor();
136     }
137
138     public String getName() {
139         return params.getOperation();
140     }
141
142     /**
143      * Determines if a property has been assigned for the operation.
144      *
145      * @param name property name
146      * @return {@code true} if the given property has been assigned for the operation,
147      *         {@code false} otherwise
148      */
149     public boolean containsProperty(String name) {
150         return properties.containsKey(name);
151     }
152
153     /**
154      * Sets a property.
155      *
156      * @param name property name
157      * @param value new value
158      */
159     public void setProperty(String name, Object value) {
160         properties.put(name, value);
161     }
162
163     /**
164      * Gets a property's value.
165      *
166      * @param name name of the property of interest
167      * @return the property's value, or {@code null} if it has no value
168      */
169     @SuppressWarnings("unchecked")
170     public <T> T getProperty(String name) {
171         return (T) properties.get(name);
172     }
173
174     @Override
175     public CompletableFuture<OperationOutcome> start() {
176         // allocate a controller for the entire operation
177         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
178
179         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
180         if (preproc == null) {
181             // no preprocessor required - just start the operation
182             return startOperationAttempt(controller, 1);
183         }
184
185         /*
186          * Do preprocessor first and then, if successful, start the operation. Note:
187          * operations create their own outcome, ignoring the outcome from any previous
188          * steps.
189          *
190          * Wrap the preprocessor to ensure "stop" is propagated to it.
191          */
192         // @formatter:off
193         controller.wrap(preproc)
194                         .exceptionally(fromException("preprocessor of operation"))
195                         .thenCompose(handlePreprocessorFailure(controller))
196                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
197                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
198         // @formatter:on
199
200         return controller;
201     }
202
203     /**
204      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
205      * invokes the call-backs, marks the controller complete, and returns an incomplete
206      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
207      * received.
208      * <p/>
209      * Assumes that no callbacks have been invoked yet.
210      *
211      * @param controller pipeline controller
212      * @return a function that checks the outcome status and continues, if successful, or
213      *         indicates a failure otherwise
214      */
215     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
216                     PipelineControllerFuture<OperationOutcome> controller) {
217
218         return outcome -> {
219
220             if (isSuccess(outcome)) {
221                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
222                 return CompletableFuture.completedFuture(outcome);
223             }
224
225             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
226
227             final Executor executor = params.getExecutor();
228             final CallbackManager callbacks = new CallbackManager();
229
230             // propagate "stop" to the callbacks
231             controller.add(callbacks);
232
233             final OperationOutcome outcome2 = params.makeOutcome();
234
235             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
236
237             outcome2.setFinalOutcome(true);
238             outcome2.setResult(PolicyResult.FAILURE_GUARD);
239             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
240
241             // @formatter:off
242             CompletableFuture.completedFuture(outcome2)
243                             .whenCompleteAsync(callbackStarted(callbacks), executor)
244                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
245                             .whenCompleteAsync(controller.delayedComplete(), executor);
246             // @formatter:on
247
248             return new CompletableFuture<>();
249         };
250     }
251
252     /**
253      * Invokes the operation's preprocessor step(s) as a "future". This method simply
254      * returns {@code null}.
255      * <p/>
256      * This method assumes the following:
257      * <ul>
258      * <li>the operator is alive</li>
259      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
260      * </ul>
261      *
262      * @return a function that will start the preprocessor and returns its outcome, or
263      *         {@code null} if this operation needs no preprocessor
264      */
265     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
266         return null;
267     }
268
269     /**
270      * Invokes the operation's guard step(s) as a "future".
271      * <p/>
272      * This method assumes the following:
273      * <ul>
274      * <li>the operator is alive</li>
275      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
276      * </ul>
277      *
278      * @return a function that will start the guard checks and returns its outcome, or
279      *         {@code null} if this operation has no guard
280      */
281     protected CompletableFuture<OperationOutcome> startGuardAsync() {
282         if (params.isPreprocessed()) {
283             return null;
284         }
285
286         // get the guard payload
287         Map<String, Object> payload = makeGuardPayload();
288
289         /*
290          * Note: can't use constants from actor.guard, because that would create a
291          * circular dependency.
292          */
293         return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
294                         .payload(payload).build().start();
295     }
296
297     /**
298      * Creates a payload to execute a guard operation.
299      *
300      * @return a new guard payload
301      */
302     protected Map<String, Object> makeGuardPayload() {
303         // TODO delete this once preprocessing is done by the application
304         Map<String, Object> guard = new LinkedHashMap<>();
305         guard.put("actor", params.getActor());
306         guard.put("operation", params.getOperation());
307         guard.put("target", params.getTargetEntity());
308         guard.put("requestId", params.getRequestId());
309
310         String clname = params.getContext().getEvent().getClosedLoopControlName();
311         if (clname != null) {
312             guard.put("clname", clname);
313         }
314
315         return guard;
316     }
317
318     /**
319      * Starts the operation attempt, with no preprocessor. When all retries complete, it
320      * will complete the controller.
321      *
322      * @param controller controller for all operation attempts
323      * @param attempt attempt number, typically starting with 1
324      * @return a future that will return the final result of all attempts
325      */
326     private CompletableFuture<OperationOutcome> startOperationAttempt(
327                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
328
329         generateSubRequestId(attempt);
330
331         // propagate "stop" to the operation attempt
332         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
333                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
334
335         return controller;
336     }
337
338     /**
339      * Generates and sets {@link #subRequestId} to a new subrequest ID.
340      *
341      * @param attempt attempt number, typically starting with 1
342      */
343     public void generateSubRequestId(int attempt) {
344         // Note: this should be "protected", but that makes junits much messier
345
346         setSubRequestId(UUID.randomUUID().toString());
347     }
348
349     /**
350      * Starts the operation attempt, without doing any retries.
351      *
352      * @param params operation parameters
353      * @param attempt attempt number, typically starting with 1
354      * @return a future that will return the result of a single operation attempt
355      */
356     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
357
358         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
359
360         final Executor executor = params.getExecutor();
361         final OperationOutcome outcome = params.makeOutcome();
362         final CallbackManager callbacks = new CallbackManager();
363
364         // this operation attempt gets its own controller
365         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
366
367         // propagate "stop" to the callbacks
368         controller.add(callbacks);
369
370         // @formatter:off
371         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
372                         .whenCompleteAsync(callbackStarted(callbacks), executor)
373                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
374         // @formatter:on
375
376         // handle timeouts, if specified
377         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
378         if (timeoutMillis > 0) {
379             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
380             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
381         }
382
383         /*
384          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
385          * before callbackCompleted() is invoked.
386          *
387          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
388          * the pipeline as the last step anyway.
389          */
390
391         // @formatter:off
392         future.exceptionally(fromException("operation"))
393                     .thenApply(setRetryFlag(attempt))
394                     .whenCompleteAsync(callbackStarted(callbacks), executor)
395                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
396                     .whenCompleteAsync(controller.delayedComplete(), executor);
397         // @formatter:on
398
399         return controller;
400     }
401
402     /**
403      * Determines if the outcome was successful.
404      *
405      * @param outcome outcome to examine
406      * @return {@code true} if the outcome was successful
407      */
408     protected boolean isSuccess(OperationOutcome outcome) {
409         return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
410     }
411
412     /**
413      * Determines if the outcome was a failure for this operator.
414      *
415      * @param outcome outcome to examine, or {@code null}
416      * @return {@code true} if the outcome is not {@code null} and was a failure
417      *         <i>and</i> was associated with this operator, {@code false} otherwise
418      */
419     protected boolean isActorFailed(OperationOutcome outcome) {
420         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
421     }
422
423     /**
424      * Determines if the given outcome is for this operation.
425      *
426      * @param outcome outcome to examine
427      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
428      */
429     protected boolean isSameOperation(OperationOutcome outcome) {
430         return OperationOutcome.isFor(outcome, getActorName(), getName());
431     }
432
433     /**
434      * Invokes the operation as a "future". This method simply invokes
435      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
436      * returning the result via a "future".
437      * <p/>
438      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
439      * the executor in the "params", as that may bring the background thread pool to a
440      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
441      * instead.
442      * <p/>
443      * This method assumes the following:
444      * <ul>
445      * <li>the operator is alive</li>
446      * <li>verifyRunning() has been invoked</li>
447      * <li>callbackStarted() has been invoked</li>
448      * <li>the invoker will perform appropriate timeout checks</li>
449      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
450      * </ul>
451      *
452      * @param attempt attempt number, typically starting with 1
453      * @return a function that will start the operation and return its result when
454      *         complete
455      */
456     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
457
458         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
459     }
460
461     /**
462      * Low-level method that performs the operation. This can make the same assumptions
463      * that are made by {@link #doOperationAsFuture()}. This particular method simply
464      * throws an {@link UnsupportedOperationException}.
465      *
466      * @param attempt attempt number, typically starting with 1
467      * @param operation the operation being performed
468      * @return the outcome of the operation
469      */
470     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
471
472         throw new UnsupportedOperationException("start operation " + getFullName());
473     }
474
475     /**
476      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
477      * FAILURE, assuming the policy specifies retries and the retry count has been
478      * exhausted.
479      *
480      * @param attempt latest attempt number, starting with 1
481      * @return a function to get the next future to execute
482      */
483     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
484
485         return origOutcome -> {
486             // ensure we have a non-null outcome
487             OperationOutcome outcome;
488             if (origOutcome != null) {
489                 outcome = origOutcome;
490             } else {
491                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
492                 outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
493             }
494
495             // ensure correct actor/operation
496             outcome.setActor(getActorName());
497             outcome.setOperation(getName());
498
499             // determine if we should retry, based on the result
500             if (outcome.getResult() != PolicyResult.FAILURE) {
501                 // do not retry success or other failure types (e.g., exception)
502                 outcome.setFinalOutcome(true);
503                 return outcome;
504             }
505
506             int retry = getRetry(params.getRetry());
507             if (retry <= 0) {
508                 // no retries were specified
509                 outcome.setFinalOutcome(true);
510
511             } else if (attempt <= retry) {
512                 // have more retries - not the final outcome
513                 outcome.setFinalOutcome(false);
514
515             } else {
516                 /*
517                  * retries were specified and we've already tried them all - change to
518                  * FAILURE_RETRIES
519                  */
520                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
521                 outcome.setResult(PolicyResult.FAILURE_RETRIES);
522                 outcome.setFinalOutcome(true);
523             }
524
525             return outcome;
526         };
527     }
528
529     /**
530      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
531      * was previously invoked, and thus that the "operation" is not {@code null}.
532      *
533      * @param controller controller for all of the retries
534      * @param attempt latest attempt number, starting with 1
535      * @return a function to get the next future to execute
536      */
537     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
538                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
539
540         return operation -> {
541             if (!isActorFailed(operation)) {
542                 // wrong type or wrong operation - just leave it as is
543                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
544                 controller.complete(operation);
545                 return new CompletableFuture<>();
546             }
547
548             if (getRetry(params.getRetry()) <= 0) {
549                 // no retries - already marked as FAILURE, so just return it
550                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
551                 controller.complete(operation);
552                 return new CompletableFuture<>();
553             }
554
555             /*
556              * Retry the operation.
557              */
558             long waitMs = getRetryWaitMs();
559             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
560
561             return sleep(waitMs, TimeUnit.MILLISECONDS)
562                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
563         };
564     }
565
566     /**
567      * Convenience method that starts a sleep(), running via a future.
568      *
569      * @param sleepTime time to sleep
570      * @param unit time unit
571      * @return a future that will complete when the sleep completes
572      */
573     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
574         if (sleepTime <= 0) {
575             return CompletableFuture.completedFuture(null);
576         }
577
578         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
579     }
580
581     /**
582      * Converts an exception into an operation outcome, returning a copy of the outcome to
583      * prevent background jobs from changing it.
584      *
585      * @param type type of item throwing the exception
586      * @return a function that will convert an exception into an operation outcome
587      */
588     private Function<Throwable, OperationOutcome> fromException(String type) {
589
590         return thrown -> {
591             OperationOutcome outcome = params.makeOutcome();
592
593             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
594                 // do not include exception in the message, as it just clutters the log
595                 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
596                                 params.getRequestId());
597             } else {
598                 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
599                                 params.getRequestId(), thrown);
600             }
601
602             return setOutcome(outcome, thrown);
603         };
604     }
605
606     /**
607      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
608      * any outstanding futures when one completes.
609      *
610      * @param futureMakers function to make a future. If the function returns
611      *        {@code null}, then no future is created for that function. On the other
612      *        hand, if the function throws an exception, then the previously created
613      *        functions are canceled and the exception is re-thrown
614      * @return a future to cancel or await an outcome, or {@code null} if no futures were
615      *         created. If this future is canceled, then all of the futures will be
616      *         canceled
617      */
618     public CompletableFuture<OperationOutcome> anyOf(
619                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
620
621         return anyOf(Arrays.asList(futureMakers));
622     }
623
624     /**
625      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
626      * any outstanding futures when one completes.
627      *
628      * @param futureMakers function to make a future. If the function returns
629      *        {@code null}, then no future is created for that function. On the other
630      *        hand, if the function throws an exception, then the previously created
631      *        functions are canceled and the exception is re-thrown
632      * @return a future to cancel or await an outcome, or {@code null} if no futures were
633      *         created. If this future is canceled, then all of the futures will be
634      *         canceled. Similarly, when this future completes, any incomplete futures
635      *         will be canceled
636      */
637     public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
638
639         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
640
641         CompletableFuture<OperationOutcome>[] futures =
642                         attachFutures(controller, futureMakers, UnaryOperator.identity());
643
644         if (futures.length == 0) {
645             // no futures were started
646             return null;
647         }
648
649         if (futures.length == 1) {
650             return futures[0];
651         }
652
653         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
654                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
655
656         return controller;
657     }
658
659     /**
660      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
661      *
662      * @param futureMakers function to make a future. If the function returns
663      *        {@code null}, then no future is created for that function. On the other
664      *        hand, if the function throws an exception, then the previously created
665      *        functions are canceled and the exception is re-thrown
666      * @return a future to cancel or await an outcome, or {@code null} if no futures were
667      *         created. If this future is canceled, then all of the futures will be
668      *         canceled
669      */
670     public CompletableFuture<OperationOutcome> allOf(
671                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
672
673         return allOf(Arrays.asList(futureMakers));
674     }
675
676     /**
677      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
678      *
679      * @param futureMakers function to make a future. If the function returns
680      *        {@code null}, then no future is created for that function. On the other
681      *        hand, if the function throws an exception, then the previously created
682      *        functions are canceled and the exception is re-thrown
683      * @return a future to cancel or await an outcome, or {@code null} if no futures were
684      *         created. If this future is canceled, then all of the futures will be
685      *         canceled. Similarly, when this future completes, any incomplete futures
686      *         will be canceled
687      */
688     public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
689         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
690
691         Queue<OperationOutcome> outcomes = new LinkedList<>();
692
693         CompletableFuture<OperationOutcome>[] futures =
694                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
695                             synchronized (outcomes) {
696                                 outcomes.add(outcome);
697                             }
698                             return outcome;
699                         }));
700
701         if (futures.length == 0) {
702             // no futures were started
703             return null;
704         }
705
706         if (futures.length == 1) {
707             return futures[0];
708         }
709
710         // @formatter:off
711         CompletableFuture.allOf(futures)
712                         .thenApply(unused -> combineOutcomes(outcomes))
713                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
714         // @formatter:on
715
716         return controller;
717     }
718
719     /**
720      * Invokes the functions to create the futures and attaches them to the controller.
721      *
722      * @param controller master controller for all of the futures
723      * @param futureMakers futures to be attached to the controller
724      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
725      *        Returns the adorned future
726      * @return an array of futures, possibly zero-length. If the array is of size one,
727      *         then that one item should be returned instead of the controller
728      */
729     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
730                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
731                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
732
733         if (futureMakers.isEmpty()) {
734             @SuppressWarnings("unchecked")
735             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
736             return result;
737         }
738
739         // the last, unadorned future that is created
740         CompletableFuture<OperationOutcome> lastFuture = null;
741
742         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
743
744         // make each future
745         for (var maker : futureMakers) {
746             try {
747                 CompletableFuture<OperationOutcome> future = maker.get();
748                 if (future == null) {
749                     continue;
750                 }
751
752                 // propagate "stop" to the future
753                 controller.add(future);
754
755                 futures.add(adorn.apply(future));
756
757                 lastFuture = future;
758
759             } catch (RuntimeException e) {
760                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
761                 controller.cancel(false);
762                 throw e;
763             }
764         }
765
766         @SuppressWarnings("unchecked")
767         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
768
769         if (result.length == 1) {
770             // special case - return the unadorned future
771             result[0] = lastFuture;
772             return result;
773         }
774
775         return futures.toArray(result);
776     }
777
778     /**
779      * Combines the outcomes from a set of tasks.
780      *
781      * @param outcomes outcomes to be examined
782      * @return the combined outcome
783      */
784     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
785
786         // identify the outcome with the highest priority
787         OperationOutcome outcome = outcomes.remove();
788         int priority = detmPriority(outcome);
789
790         for (OperationOutcome outcome2 : outcomes) {
791             int priority2 = detmPriority(outcome2);
792
793             if (priority2 > priority) {
794                 outcome = outcome2;
795                 priority = priority2;
796             }
797         }
798
799         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
800                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
801
802         return outcome;
803     }
804
805     /**
806      * Determines the priority of an outcome based on its result.
807      *
808      * @param outcome outcome to examine, or {@code null}
809      * @return the outcome's priority
810      */
811     protected int detmPriority(OperationOutcome outcome) {
812         if (outcome == null || outcome.getResult() == null) {
813             return 1;
814         }
815
816         switch (outcome.getResult()) {
817             case SUCCESS:
818                 return 0;
819
820             case FAILURE_GUARD:
821                 return 2;
822
823             case FAILURE_RETRIES:
824                 return 3;
825
826             case FAILURE:
827                 return 4;
828
829             case FAILURE_TIMEOUT:
830                 return 5;
831
832             case FAILURE_EXCEPTION:
833             default:
834                 return 6;
835         }
836     }
837
838     /**
839      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
840      * not created until the previous task completes. The pipeline returns the outcome of
841      * the last task executed.
842      *
843      * @param futureMakers functions to make the futures
844      * @return a future to cancel the sequence or await the outcome
845      */
846     public CompletableFuture<OperationOutcome> sequence(
847                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
848
849         return sequence(Arrays.asList(futureMakers));
850     }
851
852     /**
853      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
854      * not created until the previous task completes. The pipeline returns the outcome of
855      * the last task executed.
856      *
857      * @param futureMakers functions to make the futures
858      * @return a future to cancel the sequence or await the outcome, or {@code null} if
859      *         there were no tasks to perform
860      */
861     public CompletableFuture<OperationOutcome> sequence(
862                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
863
864         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
865
866         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
867         if (nextTask == null) {
868             // no tasks
869             return null;
870         }
871
872         if (queue.isEmpty()) {
873             // only one task - just return it rather than wrapping it in a controller
874             return nextTask;
875         }
876
877         /*
878          * multiple tasks - need a controller to stop whichever task is currently
879          * executing
880          */
881         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
882         final Executor executor = params.getExecutor();
883
884         // @formatter:off
885         controller.wrap(nextTask)
886                     .thenCompose(nextTaskOnSuccess(controller, queue))
887                     .whenCompleteAsync(controller.delayedComplete(), executor);
888         // @formatter:on
889
890         return controller;
891     }
892
893     /**
894      * Executes the next task in the queue, if the previous outcome was successful.
895      *
896      * @param controller pipeline controller
897      * @param taskQueue queue of tasks to be performed
898      * @return a future to execute the remaining tasks, or the current outcome, if it's a
899      *         failure, or if there are no more tasks
900      */
901     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
902                     PipelineControllerFuture<OperationOutcome> controller,
903                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
904
905         return outcome -> {
906             if (!isSuccess(outcome)) {
907                 // return the failure
908                 return CompletableFuture.completedFuture(outcome);
909             }
910
911             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
912             if (nextTask == null) {
913                 // no tasks - just return the success
914                 return CompletableFuture.completedFuture(outcome);
915             }
916
917             // @formatter:off
918             return controller
919                         .wrap(nextTask)
920                         .thenCompose(nextTaskOnSuccess(controller, taskQueue));
921             // @formatter:on
922         };
923     }
924
925     /**
926      * Gets the next task from the queue, skipping those that are {@code null}.
927      *
928      * @param taskQueue task queue
929      * @return the next task, or {@code null} if the queue is now empty
930      */
931     private CompletableFuture<OperationOutcome> getNextTask(
932                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
933
934         Supplier<CompletableFuture<OperationOutcome>> maker;
935
936         while ((maker = taskQueue.poll()) != null) {
937             CompletableFuture<OperationOutcome> future = maker.get();
938             if (future != null) {
939                 return future;
940             }
941         }
942
943         return null;
944     }
945
946     /**
947      * Sets the start time of the operation and invokes the callback to indicate that the
948      * operation has started. Does nothing if the pipeline has been stopped.
949      * <p/>
950      * This assumes that the "outcome" is not {@code null}.
951      *
952      * @param callbacks used to determine if the start callback can be invoked
953      * @return a function that sets the start time and invokes the callback
954      */
955     protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
956
957         return (outcome, thrown) -> {
958
959             if (callbacks.canStart()) {
960                 outcome.setSubRequestId(getSubRequestId());
961                 outcome.setStart(callbacks.getStartTime());
962                 outcome.setEnd(null);
963
964                 // pass a copy to the callback
965                 OperationOutcome outcome2 = new OperationOutcome(outcome);
966                 outcome2.setFinalOutcome(false);
967                 params.callbackStarted(outcome2);
968             }
969         };
970     }
971
972     /**
973      * Sets the end time of the operation and invokes the callback to indicate that the
974      * operation has completed. Does nothing if the pipeline has been stopped.
975      * <p/>
976      * This assumes that the "outcome" is not {@code null}.
977      * <p/>
978      * Note: the start time must be a reference rather than a plain value, because it's
979      * value must be gotten on-demand, when the returned function is executed at a later
980      * time.
981      *
982      * @param callbacks used to determine if the end callback can be invoked
983      * @return a function that sets the end time and invokes the callback
984      */
985     protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
986
987         return (outcome, thrown) -> {
988             if (callbacks.canEnd()) {
989                 outcome.setSubRequestId(getSubRequestId());
990                 outcome.setStart(callbacks.getStartTime());
991                 outcome.setEnd(callbacks.getEndTime());
992
993                 // pass a copy to the callback
994                 params.callbackCompleted(new OperationOutcome(outcome));
995             }
996         };
997     }
998
999     /**
1000      * Sets an operation's outcome and message, based on a throwable.
1001      *
1002      * @param operation operation to be updated
1003      * @return the updated operation
1004      */
1005     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
1006         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
1007         return setOutcome(operation, result);
1008     }
1009
1010     /**
1011      * Sets an operation's outcome and default message based on the result.
1012      *
1013      * @param operation operation to be updated
1014      * @param result result of the operation
1015      * @return the updated operation
1016      */
1017     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
1018         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
1019         operation.setResult(result);
1020         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
1021                         : ControlLoopOperation.FAILED_MSG);
1022
1023         return operation;
1024     }
1025
1026     /**
1027      * Determines if a throwable is due to a timeout.
1028      *
1029      * @param thrown throwable of interest
1030      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
1031      */
1032     protected boolean isTimeout(Throwable thrown) {
1033         if (thrown instanceof CompletionException) {
1034             thrown = thrown.getCause();
1035         }
1036
1037         return (thrown instanceof TimeoutException);
1038     }
1039
1040     /**
1041      * Logs a message. If the message is not of type, String, then it attempts to
1042      * pretty-print it into JSON before logging.
1043      *
1044      * @param direction IN or OUT
1045      * @param infra communication infrastructure on which it was published
1046      * @param source source name (e.g., the URL or Topic name)
1047      * @param message message to be logged
1048      * @return the JSON text that was logged
1049      */
1050     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
1051         String json;
1052         try {
1053             json = prettyPrint(message);
1054
1055         } catch (IllegalArgumentException e) {
1056             String type = (direction == EventType.IN ? "response" : "request");
1057             logger.warn("cannot pretty-print {}", type, e);
1058             json = message.toString();
1059         }
1060
1061         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1062
1063         return json;
1064     }
1065
1066     /**
1067      * Converts a message to a "pretty-printed" String using the operation's normal
1068      * serialization provider (i.e., it's <i>coder</i>).
1069      *
1070      * @param message response to be logged
1071      * @return the JSON text that was logged
1072      * @throws IllegalArgumentException if the message cannot be converted
1073      */
1074     public <T> String prettyPrint(T message) {
1075         if (message == null) {
1076             return null;
1077         } else if (message instanceof String) {
1078             return message.toString();
1079         } else {
1080             try {
1081                 return getCoder().encode(message, true);
1082             } catch (CoderException e) {
1083                 throw new IllegalArgumentException("cannot encode message", e);
1084             }
1085         }
1086     }
1087
1088     // these may be overridden by subclasses or junit tests
1089
1090     /**
1091      * Gets the retry count.
1092      *
1093      * @param retry retry, extracted from the parameters, or {@code null}
1094      * @return the number of retries, or {@code 0} if no retries were specified
1095      */
1096     protected int getRetry(Integer retry) {
1097         return (retry == null ? 0 : retry);
1098     }
1099
1100     /**
1101      * Gets the retry wait, in milliseconds.
1102      *
1103      * @return the retry wait, in milliseconds
1104      */
1105     protected long getRetryWaitMs() {
1106         return DEFAULT_RETRY_WAIT_MS;
1107     }
1108
1109     /**
1110      * Gets the operation timeout.
1111      *
1112      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1113      *        {@code null}
1114      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1115      *         specified
1116      */
1117     protected long getTimeoutMs(Integer timeoutSec) {
1118         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1119     }
1120
1121     // these may be overridden by junit tests
1122
1123     protected Coder getCoder() {
1124         return coder;
1125     }
1126 }