Merge "Remove commons-io dependency from models"
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedHashMap;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Queue;
32 import java.util.UUID;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import java.util.function.BiConsumer;
40 import java.util.function.Function;
41 import java.util.function.Supplier;
42 import java.util.function.UnaryOperator;
43 import lombok.AccessLevel;
44 import lombok.Getter;
45 import lombok.Setter;
46 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
48 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
49 import org.onap.policy.common.utils.coder.Coder;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.controlloop.ControlLoopOperation;
53 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
54 import org.onap.policy.controlloop.actorserviceprovider.Operation;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
57 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
59 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
60 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  * Partial implementation of an operator. In general, it's preferable that subclasses
66  * would override {@link #startOperationAsync(int, OperationOutcome)
67  * startOperationAsync()}. However, if that proves to be too difficult, then they can
68  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
69  * if the operation requires any preprocessor steps, the subclass may choose to override
70  * {@link #startPreprocessorAsync()}.
71  * <p/>
72  * The futures returned by the methods within this class can be canceled, and will
73  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
74  * returned by overridden methods will do the same. Of course, if a class overrides
75  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
76  * be done to cancel that particular operation.
77  * <p/>
78  * In general tasks in a pipeline are executed by the same thread. However, the following
79  * should always be executed via the executor specified in "params":
80  * <ul>
81  * <li>start callback</li>
82  * <li>completion callback</li>
83  * <li>controller completion (i.e., delayedComplete())</li>
84  * </ul>
85  */
86 public abstract class OperationPartial implements Operation {
87     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
88     private static final Coder coder = new StandardCoder();
89
90     public static final String GUARD_ACTOR_NAME = "GUARD";
91     public static final String GUARD_OPERATION_NAME = "Decision";
92     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
93
94     private final OperatorConfig config;
95
96     /**
97      * Operation parameters.
98      */
99     protected final ControlLoopOperationParams params;
100
101     @Getter
102     private final String fullName;
103
104     @Getter
105     @Setter(AccessLevel.PROTECTED)
106     private String subRequestId;
107
108     @Getter
109     private final List<String> propertyNames;
110
111     /**
112      * Values for the properties identified by {@link #getPropertyNames()}.
113      */
114     private final Map<String, Object> properties = new HashMap<>();
115
116
117     /**
118      * Constructs the object.
119      *
120      * @param params operation parameters
121      * @param config configuration for this operation
122      * @param propertyNames names of properties required by this operation
123      */
124     public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
125         this.params = params;
126         this.config = config;
127         this.fullName = params.getActor() + "." + params.getOperation();
128         this.propertyNames = propertyNames;
129     }
130
131     public Executor getBlockingExecutor() {
132         return config.getBlockingExecutor();
133     }
134
135     @Override
136     public String getActorName() {
137         return params.getActor();
138     }
139
140     @Override
141     public String getName() {
142         return params.getOperation();
143     }
144
145     @Override
146     public boolean containsProperty(String name) {
147         return properties.containsKey(name);
148     }
149
150     @Override
151     public void setProperty(String name, Object value) {
152         logger.info("{}: set property {}={}", getFullName(), name, value);
153         properties.put(name, value);
154     }
155
156     @SuppressWarnings("unchecked")
157     @Override
158     public <T> T getProperty(String name) {
159         return (T) properties.get(name);
160     }
161
162     @Override
163     public CompletableFuture<OperationOutcome> start() {
164         // allocate a controller for the entire operation
165         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
166
167         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
168         if (preproc == null) {
169             // no preprocessor required - just start the operation
170             return startOperationAttempt(controller, 1);
171         }
172
173         /*
174          * Do preprocessor first and then, if successful, start the operation. Note:
175          * operations create their own outcome, ignoring the outcome from any previous
176          * steps.
177          *
178          * Wrap the preprocessor to ensure "stop" is propagated to it.
179          */
180         // @formatter:off
181         controller.wrap(preproc)
182                         .exceptionally(fromException("preprocessor of operation"))
183                         .thenCompose(handlePreprocessorFailure(controller))
184                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
185                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
186         // @formatter:on
187
188         return controller;
189     }
190
191     /**
192      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
193      * invokes the call-backs, marks the controller complete, and returns an incomplete
194      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
195      * received.
196      * <p/>
197      * Assumes that no callbacks have been invoked yet.
198      *
199      * @param controller pipeline controller
200      * @return a function that checks the outcome status and continues, if successful, or
201      *         indicates a failure otherwise
202      */
203     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
204                     PipelineControllerFuture<OperationOutcome> controller) {
205
206         return outcome -> {
207
208             if (isSuccess(outcome)) {
209                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
210                 return CompletableFuture.completedFuture(outcome);
211             }
212
213             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
214
215             final Executor executor = params.getExecutor();
216             final CallbackManager callbacks = new CallbackManager();
217
218             // propagate "stop" to the callbacks
219             controller.add(callbacks);
220
221             final OperationOutcome outcome2 = params.makeOutcome(getTargetEntity());
222
223             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
224
225             outcome2.setFinalOutcome(true);
226             outcome2.setResult(OperationResult.FAILURE_GUARD);
227             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
228
229             // @formatter:off
230             CompletableFuture.completedFuture(outcome2)
231                             .whenCompleteAsync(callbackStarted(callbacks), executor)
232                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
233                             .whenCompleteAsync(controller.delayedComplete(), executor);
234             // @formatter:on
235
236             return new CompletableFuture<>();
237         };
238     }
239
240     /**
241      * Invokes the operation's preprocessor step(s) as a "future". This method simply
242      * returns {@code null}.
243      * <p/>
244      * This method assumes the following:
245      * <ul>
246      * <li>the operator is alive</li>
247      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
248      * </ul>
249      *
250      * @return a function that will start the preprocessor and returns its outcome, or
251      *         {@code null} if this operation needs no preprocessor
252      */
253     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
254         return null;
255     }
256
257     /**
258      * Invokes the operation's guard step(s) as a "future".
259      * <p/>
260      * This method assumes the following:
261      * <ul>
262      * <li>the operator is alive</li>
263      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
264      * </ul>
265      *
266      * @return a function that will start the guard checks and returns its outcome, or
267      *         {@code null} if this operation has no guard
268      */
269     protected CompletableFuture<OperationOutcome> startGuardAsync() {
270         if (params.isPreprocessed()) {
271             return null;
272         }
273
274         // get the guard payload
275         Map<String, Object> payload = makeGuardPayload();
276
277         /*
278          * Note: can't use constants from actor.guard, because that would create a
279          * circular dependency.
280          */
281         return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
282                         .payload(payload).build().start();
283     }
284
285     /**
286      * Creates a payload to execute a guard operation.
287      *
288      * @return a new guard payload
289      */
290     protected Map<String, Object> makeGuardPayload() {
291         // TODO delete this once preprocessing is done by the application
292         Map<String, Object> guard = new LinkedHashMap<>();
293         guard.put("actor", params.getActor());
294         guard.put("operation", params.getOperation());
295         guard.put("target", getTargetEntity());
296         guard.put("requestId", params.getRequestId());
297
298         String clname = params.getContext().getEvent().getClosedLoopControlName();
299         if (clname != null) {
300             guard.put("clname", clname);
301         }
302
303         return guard;
304     }
305
306     /**
307      * Starts the operation attempt, with no preprocessor. When all retries complete, it
308      * will complete the controller.
309      *
310      * @param controller controller for all operation attempts
311      * @param attempt attempt number, typically starting with 1
312      * @return a future that will return the final result of all attempts
313      */
314     private CompletableFuture<OperationOutcome> startOperationAttempt(
315                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
316
317         generateSubRequestId(attempt);
318
319         // propagate "stop" to the operation attempt
320         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
321                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
322
323         return controller;
324     }
325
326     /**
327      * Generates and sets {@link #subRequestId} to a new subrequest ID.
328      *
329      * @param attempt attempt number, typically starting with 1
330      */
331     public void generateSubRequestId(int attempt) {
332         // Note: this should be "protected", but that makes junits much messier
333
334         setSubRequestId(UUID.randomUUID().toString());
335     }
336
337     /**
338      * Starts the operation attempt, without doing any retries.
339      *
340      * @param params operation parameters
341      * @param attempt attempt number, typically starting with 1
342      * @return a future that will return the result of a single operation attempt
343      */
344     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
345
346         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
347
348         final Executor executor = params.getExecutor();
349         final OperationOutcome outcome = params.makeOutcome(getTargetEntity());
350         final CallbackManager callbacks = new CallbackManager();
351
352         // this operation attempt gets its own controller
353         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
354
355         // propagate "stop" to the callbacks
356         controller.add(callbacks);
357
358         // @formatter:off
359         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
360                         .whenCompleteAsync(callbackStarted(callbacks), executor)
361                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
362         // @formatter:on
363
364         // handle timeouts, if specified
365         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
366         if (timeoutMillis > 0) {
367             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
368             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
369         }
370
371         /*
372          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
373          * before callbackCompleted() is invoked.
374          *
375          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
376          * the pipeline as the last step anyway.
377          */
378
379         // @formatter:off
380         future.exceptionally(fromException("operation"))
381                     .thenApply(setRetryFlag(attempt))
382                     .whenCompleteAsync(callbackStarted(callbacks), executor)
383                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
384                     .whenCompleteAsync(controller.delayedComplete(), executor);
385         // @formatter:on
386
387         return controller;
388     }
389
390     /**
391      * Determines if the outcome was successful.
392      *
393      * @param outcome outcome to examine
394      * @return {@code true} if the outcome was successful
395      */
396     protected boolean isSuccess(OperationOutcome outcome) {
397         return (outcome != null && outcome.getResult() == OperationResult.SUCCESS);
398     }
399
400     /**
401      * Determines if the outcome was a failure for this operator.
402      *
403      * @param outcome outcome to examine, or {@code null}
404      * @return {@code true} if the outcome is not {@code null} and was a failure
405      *         <i>and</i> was associated with this operator, {@code false} otherwise
406      */
407     protected boolean isActorFailed(OperationOutcome outcome) {
408         return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE);
409     }
410
411     /**
412      * Determines if the given outcome is for this operation.
413      *
414      * @param outcome outcome to examine
415      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
416      */
417     protected boolean isSameOperation(OperationOutcome outcome) {
418         return OperationOutcome.isFor(outcome, getActorName(), getName());
419     }
420
421     /**
422      * Invokes the operation as a "future". This method simply invokes
423      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
424      * returning the result via a "future".
425      * <p/>
426      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
427      * the executor in the "params", as that may bring the background thread pool to a
428      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
429      * instead.
430      * <p/>
431      * This method assumes the following:
432      * <ul>
433      * <li>the operator is alive</li>
434      * <li>verifyRunning() has been invoked</li>
435      * <li>callbackStarted() has been invoked</li>
436      * <li>the invoker will perform appropriate timeout checks</li>
437      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
438      * </ul>
439      *
440      * @param attempt attempt number, typically starting with 1
441      * @return a function that will start the operation and return its result when
442      *         complete
443      */
444     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
445
446         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
447     }
448
449     /**
450      * Low-level method that performs the operation. This can make the same assumptions
451      * that are made by {@link #doOperationAsFuture()}. This particular method simply
452      * throws an {@link UnsupportedOperationException}.
453      *
454      * @param attempt attempt number, typically starting with 1
455      * @param operation the operation being performed
456      * @return the outcome of the operation
457      */
458     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
459
460         throw new UnsupportedOperationException("start operation " + getFullName());
461     }
462
463     /**
464      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
465      * FAILURE, assuming the policy specifies retries and the retry count has been
466      * exhausted.
467      *
468      * @param attempt latest attempt number, starting with 1
469      * @return a function to get the next future to execute
470      */
471     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
472
473         return origOutcome -> {
474             // ensure we have a non-null outcome
475             OperationOutcome outcome;
476             if (origOutcome != null) {
477                 outcome = origOutcome;
478             } else {
479                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
480                 outcome = this.setOutcome(params.makeOutcome(getTargetEntity()), OperationResult.FAILURE);
481             }
482
483             // ensure correct actor/operation
484             outcome.setActor(getActorName());
485             outcome.setOperation(getName());
486
487             // determine if we should retry, based on the result
488             if (outcome.getResult() != OperationResult.FAILURE) {
489                 // do not retry success or other failure types (e.g., exception)
490                 outcome.setFinalOutcome(true);
491                 return outcome;
492             }
493
494             int retry = getRetry(params.getRetry());
495             if (retry <= 0) {
496                 // no retries were specified
497                 outcome.setFinalOutcome(true);
498
499             } else if (attempt <= retry) {
500                 // have more retries - not the final outcome
501                 outcome.setFinalOutcome(false);
502
503             } else {
504                 /*
505                  * retries were specified and we've already tried them all - change to
506                  * FAILURE_RETRIES
507                  */
508                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
509                 outcome.setResult(OperationResult.FAILURE_RETRIES);
510                 outcome.setFinalOutcome(true);
511             }
512
513             return outcome;
514         };
515     }
516
517     /**
518      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
519      * was previously invoked, and thus that the "operation" is not {@code null}.
520      *
521      * @param controller controller for all of the retries
522      * @param attempt latest attempt number, starting with 1
523      * @return a function to get the next future to execute
524      */
525     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
526                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
527
528         return operation -> {
529             if (!isActorFailed(operation)) {
530                 // wrong type or wrong operation - just leave it as is
531                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
532                 controller.complete(operation);
533                 return new CompletableFuture<>();
534             }
535
536             if (getRetry(params.getRetry()) <= 0) {
537                 // no retries - already marked as FAILURE, so just return it
538                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
539                 controller.complete(operation);
540                 return new CompletableFuture<>();
541             }
542
543             /*
544              * Retry the operation.
545              */
546             long waitMs = getRetryWaitMs();
547             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
548
549             return sleep(waitMs, TimeUnit.MILLISECONDS)
550                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
551         };
552     }
553
554     /**
555      * Convenience method that starts a sleep(), running via a future.
556      *
557      * @param sleepTime time to sleep
558      * @param unit time unit
559      * @return a future that will complete when the sleep completes
560      */
561     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
562         if (sleepTime <= 0) {
563             return CompletableFuture.completedFuture(null);
564         }
565
566         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
567     }
568
569     /**
570      * Converts an exception into an operation outcome, returning a copy of the outcome to
571      * prevent background jobs from changing it.
572      *
573      * @param type type of item throwing the exception
574      * @return a function that will convert an exception into an operation outcome
575      */
576     private Function<Throwable, OperationOutcome> fromException(String type) {
577
578         return thrown -> {
579             OperationOutcome outcome = params.makeOutcome(getTargetEntity());
580
581             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
582                 // do not include exception in the message, as it just clutters the log
583                 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
584                                 params.getRequestId());
585             } else {
586                 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
587                                 params.getRequestId(), thrown);
588             }
589
590             return setOutcome(outcome, thrown);
591         };
592     }
593
594     /**
595      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
596      * any outstanding futures when one completes.
597      *
598      * @param futureMakers function to make a future. If the function returns
599      *        {@code null}, then no future is created for that function. On the other
600      *        hand, if the function throws an exception, then the previously created
601      *        functions are canceled and the exception is re-thrown
602      * @return a future to cancel or await an outcome, or {@code null} if no futures were
603      *         created. If this future is canceled, then all of the futures will be
604      *         canceled
605      */
606     public CompletableFuture<OperationOutcome> anyOf(
607                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
608
609         return anyOf(Arrays.asList(futureMakers));
610     }
611
612     /**
613      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
614      * any outstanding futures when one completes.
615      *
616      * @param futureMakers function to make a future. If the function returns
617      *        {@code null}, then no future is created for that function. On the other
618      *        hand, if the function throws an exception, then the previously created
619      *        functions are canceled and the exception is re-thrown
620      * @return a future to cancel or await an outcome, or {@code null} if no futures were
621      *         created. If this future is canceled, then all of the futures will be
622      *         canceled. Similarly, when this future completes, any incomplete futures
623      *         will be canceled
624      */
625     public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
626
627         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
628
629         CompletableFuture<OperationOutcome>[] futures =
630                         attachFutures(controller, futureMakers, UnaryOperator.identity());
631
632         if (futures.length == 0) {
633             // no futures were started
634             return null;
635         }
636
637         if (futures.length == 1) {
638             return futures[0];
639         }
640
641         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
642                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
643
644         return controller;
645     }
646
647     /**
648      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
649      *
650      * @param futureMakers function to make a future. If the function returns
651      *        {@code null}, then no future is created for that function. On the other
652      *        hand, if the function throws an exception, then the previously created
653      *        functions are canceled and the exception is re-thrown
654      * @return a future to cancel or await an outcome, or {@code null} if no futures were
655      *         created. If this future is canceled, then all of the futures will be
656      *         canceled
657      */
658     public CompletableFuture<OperationOutcome> allOf(
659                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
660
661         return allOf(Arrays.asList(futureMakers));
662     }
663
664     /**
665      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
666      *
667      * @param futureMakers function to make a future. If the function returns
668      *        {@code null}, then no future is created for that function. On the other
669      *        hand, if the function throws an exception, then the previously created
670      *        functions are canceled and the exception is re-thrown
671      * @return a future to cancel or await an outcome, or {@code null} if no futures were
672      *         created. If this future is canceled, then all of the futures will be
673      *         canceled. Similarly, when this future completes, any incomplete futures
674      *         will be canceled
675      */
676     public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
677         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
678
679         Queue<OperationOutcome> outcomes = new LinkedList<>();
680
681         CompletableFuture<OperationOutcome>[] futures =
682                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
683                             synchronized (outcomes) {
684                                 outcomes.add(outcome);
685                             }
686                             return outcome;
687                         }));
688
689         if (futures.length == 0) {
690             // no futures were started
691             return null;
692         }
693
694         if (futures.length == 1) {
695             return futures[0];
696         }
697
698         // @formatter:off
699         CompletableFuture.allOf(futures)
700                         .thenApply(unused -> combineOutcomes(outcomes))
701                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
702         // @formatter:on
703
704         return controller;
705     }
706
707     /**
708      * Invokes the functions to create the futures and attaches them to the controller.
709      *
710      * @param controller master controller for all of the futures
711      * @param futureMakers futures to be attached to the controller
712      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
713      *        Returns the adorned future
714      * @return an array of futures, possibly zero-length. If the array is of size one,
715      *         then that one item should be returned instead of the controller
716      */
717     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
718                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
719                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
720
721         if (futureMakers.isEmpty()) {
722             @SuppressWarnings("unchecked")
723             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
724             return result;
725         }
726
727         // the last, unadorned future that is created
728         CompletableFuture<OperationOutcome> lastFuture = null;
729
730         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
731
732         // make each future
733         for (var maker : futureMakers) {
734             try {
735                 CompletableFuture<OperationOutcome> future = maker.get();
736                 if (future == null) {
737                     continue;
738                 }
739
740                 // propagate "stop" to the future
741                 controller.add(future);
742
743                 futures.add(adorn.apply(future));
744
745                 lastFuture = future;
746
747             } catch (RuntimeException e) {
748                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
749                 controller.cancel(false);
750                 throw e;
751             }
752         }
753
754         @SuppressWarnings("unchecked")
755         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
756
757         if (result.length == 1) {
758             // special case - return the unadorned future
759             result[0] = lastFuture;
760             return result;
761         }
762
763         return futures.toArray(result);
764     }
765
766     /**
767      * Combines the outcomes from a set of tasks.
768      *
769      * @param outcomes outcomes to be examined
770      * @return the combined outcome
771      */
772     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
773
774         // identify the outcome with the highest priority
775         OperationOutcome outcome = outcomes.remove();
776         int priority = detmPriority(outcome);
777
778         for (OperationOutcome outcome2 : outcomes) {
779             int priority2 = detmPriority(outcome2);
780
781             if (priority2 > priority) {
782                 outcome = outcome2;
783                 priority = priority2;
784             }
785         }
786
787         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
788                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
789
790         return outcome;
791     }
792
793     /**
794      * Determines the priority of an outcome based on its result.
795      *
796      * @param outcome outcome to examine, or {@code null}
797      * @return the outcome's priority
798      */
799     protected int detmPriority(OperationOutcome outcome) {
800         if (outcome == null || outcome.getResult() == null) {
801             return 1;
802         }
803
804         switch (outcome.getResult()) {
805             case SUCCESS:
806                 return 0;
807
808             case FAILURE_GUARD:
809                 return 2;
810
811             case FAILURE_RETRIES:
812                 return 3;
813
814             case FAILURE:
815                 return 4;
816
817             case FAILURE_TIMEOUT:
818                 return 5;
819
820             case FAILURE_EXCEPTION:
821             default:
822                 return 6;
823         }
824     }
825
826     /**
827      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
828      * not created until the previous task completes. The pipeline returns the outcome of
829      * the last task executed.
830      *
831      * @param futureMakers functions to make the futures
832      * @return a future to cancel the sequence or await the outcome
833      */
834     public CompletableFuture<OperationOutcome> sequence(
835                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
836
837         return sequence(Arrays.asList(futureMakers));
838     }
839
840     /**
841      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
842      * not created until the previous task completes. The pipeline returns the outcome of
843      * the last task executed.
844      *
845      * @param futureMakers functions to make the futures
846      * @return a future to cancel the sequence or await the outcome, or {@code null} if
847      *         there were no tasks to perform
848      */
849     public CompletableFuture<OperationOutcome> sequence(
850                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
851
852         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
853
854         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
855         if (nextTask == null) {
856             // no tasks
857             return null;
858         }
859
860         if (queue.isEmpty()) {
861             // only one task - just return it rather than wrapping it in a controller
862             return nextTask;
863         }
864
865         /*
866          * multiple tasks - need a controller to stop whichever task is currently
867          * executing
868          */
869         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
870         final Executor executor = params.getExecutor();
871
872         // @formatter:off
873         controller.wrap(nextTask)
874                     .thenCompose(nextTaskOnSuccess(controller, queue))
875                     .whenCompleteAsync(controller.delayedComplete(), executor);
876         // @formatter:on
877
878         return controller;
879     }
880
881     /**
882      * Executes the next task in the queue, if the previous outcome was successful.
883      *
884      * @param controller pipeline controller
885      * @param taskQueue queue of tasks to be performed
886      * @return a future to execute the remaining tasks, or the current outcome, if it's a
887      *         failure, or if there are no more tasks
888      */
889     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
890                     PipelineControllerFuture<OperationOutcome> controller,
891                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
892
893         return outcome -> {
894             if (!isSuccess(outcome)) {
895                 // return the failure
896                 return CompletableFuture.completedFuture(outcome);
897             }
898
899             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
900             if (nextTask == null) {
901                 // no tasks - just return the success
902                 return CompletableFuture.completedFuture(outcome);
903             }
904
905             // @formatter:off
906             return controller
907                         .wrap(nextTask)
908                         .thenCompose(nextTaskOnSuccess(controller, taskQueue));
909             // @formatter:on
910         };
911     }
912
913     /**
914      * Gets the next task from the queue, skipping those that are {@code null}.
915      *
916      * @param taskQueue task queue
917      * @return the next task, or {@code null} if the queue is now empty
918      */
919     private CompletableFuture<OperationOutcome> getNextTask(
920                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
921
922         Supplier<CompletableFuture<OperationOutcome>> maker;
923
924         while ((maker = taskQueue.poll()) != null) {
925             CompletableFuture<OperationOutcome> future = maker.get();
926             if (future != null) {
927                 return future;
928             }
929         }
930
931         return null;
932     }
933
934     /**
935      * Sets the start time of the operation and invokes the callback to indicate that the
936      * operation has started. Does nothing if the pipeline has been stopped.
937      * <p/>
938      * This assumes that the "outcome" is not {@code null}.
939      *
940      * @param callbacks used to determine if the start callback can be invoked
941      * @return a function that sets the start time and invokes the callback
942      */
943     protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
944
945         return (outcome, thrown) -> {
946
947             if (callbacks.canStart()) {
948                 outcome.setSubRequestId(getSubRequestId());
949                 outcome.setStart(callbacks.getStartTime());
950                 outcome.setEnd(null);
951
952                 // pass a copy to the callback
953                 OperationOutcome outcome2 = new OperationOutcome(outcome);
954                 outcome2.setFinalOutcome(false);
955                 params.callbackStarted(outcome2);
956             }
957         };
958     }
959
960     /**
961      * Sets the end time of the operation and invokes the callback to indicate that the
962      * operation has completed. Does nothing if the pipeline has been stopped.
963      * <p/>
964      * This assumes that the "outcome" is not {@code null}.
965      * <p/>
966      * Note: the start time must be a reference rather than a plain value, because it's
967      * value must be gotten on-demand, when the returned function is executed at a later
968      * time.
969      *
970      * @param callbacks used to determine if the end callback can be invoked
971      * @return a function that sets the end time and invokes the callback
972      */
973     protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
974
975         return (outcome, thrown) -> {
976             if (callbacks.canEnd()) {
977                 outcome.setSubRequestId(getSubRequestId());
978                 outcome.setStart(callbacks.getStartTime());
979                 outcome.setEnd(callbacks.getEndTime());
980
981                 // pass a copy to the callback
982                 params.callbackCompleted(new OperationOutcome(outcome));
983             }
984         };
985     }
986
987     /**
988      * Sets an operation's outcome and message, based on a throwable.
989      *
990      * @param operation operation to be updated
991      * @return the updated operation
992      */
993     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
994         OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT
995                 : OperationResult.FAILURE_EXCEPTION);
996         return setOutcome(operation, result);
997     }
998
999     /**
1000      * Sets an operation's outcome and default message based on the result.
1001      *
1002      * @param operation operation to be updated
1003      * @param result result of the operation
1004      * @return the updated operation
1005      */
1006     public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) {
1007         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
1008         operation.setResult(result);
1009         operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
1010                         : ControlLoopOperation.FAILED_MSG);
1011
1012         return operation;
1013     }
1014
1015     /**
1016      * Determines if a throwable is due to a timeout.
1017      *
1018      * @param thrown throwable of interest
1019      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
1020      */
1021     protected boolean isTimeout(Throwable thrown) {
1022         if (thrown instanceof CompletionException) {
1023             thrown = thrown.getCause();
1024         }
1025
1026         return (thrown instanceof TimeoutException);
1027     }
1028
1029     /**
1030      * Logs a message. If the message is not of type, String, then it attempts to
1031      * pretty-print it into JSON before logging.
1032      *
1033      * @param direction IN or OUT
1034      * @param infra communication infrastructure on which it was published
1035      * @param source source name (e.g., the URL or Topic name)
1036      * @param message message to be logged
1037      * @return the JSON text that was logged
1038      */
1039     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
1040         String json;
1041         try {
1042             json = prettyPrint(message);
1043
1044         } catch (IllegalArgumentException e) {
1045             String type = (direction == EventType.IN ? "response" : "request");
1046             logger.warn("cannot pretty-print {}", type, e);
1047             json = message.toString();
1048         }
1049
1050         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1051
1052         return json;
1053     }
1054
1055     /**
1056      * Converts a message to a "pretty-printed" String using the operation's normal
1057      * serialization provider (i.e., it's <i>coder</i>).
1058      *
1059      * @param message response to be logged
1060      * @return the JSON text that was logged
1061      * @throws IllegalArgumentException if the message cannot be converted
1062      */
1063     public <T> String prettyPrint(T message) {
1064         if (message == null) {
1065             return null;
1066         } else if (message instanceof String) {
1067             return message.toString();
1068         } else {
1069             try {
1070                 return getCoder().encode(message, true);
1071             } catch (CoderException e) {
1072                 throw new IllegalArgumentException("cannot encode message", e);
1073             }
1074         }
1075     }
1076
1077     // these may be overridden by subclasses or junit tests
1078
1079     /**
1080      * Gets the retry count.
1081      *
1082      * @param retry retry, extracted from the parameters, or {@code null}
1083      * @return the number of retries, or {@code 0} if no retries were specified
1084      */
1085     protected int getRetry(Integer retry) {
1086         return (retry == null ? 0 : retry);
1087     }
1088
1089     /**
1090      * Gets the retry wait, in milliseconds.
1091      *
1092      * @return the retry wait, in milliseconds
1093      */
1094     protected long getRetryWaitMs() {
1095         return DEFAULT_RETRY_WAIT_MS;
1096     }
1097
1098     /**
1099      * Gets the target entity, first trying the properties and then the parameters.
1100      *
1101      * @return the target entity
1102      */
1103     protected String getTargetEntity() {
1104         String targetEntity = getProperty(OperationProperties.AAI_TARGET_ENTITY);
1105         return (targetEntity != null ? targetEntity : params.getTargetEntity());
1106     }
1107
1108     /**
1109      * Gets the operation timeout.
1110      *
1111      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1112      *        {@code null}
1113      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1114      *         specified
1115      */
1116     protected long getTimeoutMs(Integer timeoutSec) {
1117         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1118     }
1119
1120     // these may be overridden by junit tests
1121
1122     protected Coder getCoder() {
1123         return coder;
1124     }
1125 }