More actor clean-up
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.CompletionException;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.function.BiConsumer;
35 import java.util.function.Function;
36 import java.util.function.Supplier;
37 import java.util.function.UnaryOperator;
38 import lombok.Getter;
39 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
40 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
41 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
42 import org.onap.policy.common.utils.coder.Coder;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.controlloop.ControlLoopOperation;
46 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
47 import org.onap.policy.controlloop.actorserviceprovider.Operation;
48 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
49 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
50 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
51 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
52 import org.onap.policy.controlloop.policy.PolicyResult;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Partial implementation of an operator. In general, it's preferable that subclasses
58  * would override {@link #startOperationAsync(int, OperationOutcome)
59  * startOperationAsync()}. However, if that proves to be too difficult, then they can
60  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
61  * if the operation requires any preprocessor steps, the subclass may choose to override
62  * {@link #startPreprocessorAsync()}.
63  * <p/>
64  * The futures returned by the methods within this class can be canceled, and will
65  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
66  * returned by overridden methods will do the same. Of course, if a class overrides
67  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
68  * be done to cancel that particular operation.
69  */
70 public abstract class OperationPartial implements Operation {
71     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
72     private static final Coder coder = new StandardCoder();
73
74     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
75
76     private final OperatorConfig config;
77
78     /**
79      * Operation parameters.
80      */
81     protected final ControlLoopOperationParams params;
82
83     @Getter
84     private final String fullName;
85
86
87     /**
88      * Constructs the object.
89      *
90      * @param params operation parameters
91      * @param config configuration for this operation
92      */
93     public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
94         this.params = params;
95         this.config = config;
96         this.fullName = params.getActor() + "." + params.getOperation();
97     }
98
99     public Executor getBlockingExecutor() {
100         return config.getBlockingExecutor();
101     }
102
103     public String getActorName() {
104         return params.getActor();
105     }
106
107     public String getName() {
108         return params.getOperation();
109     }
110
111     @Override
112     public final CompletableFuture<OperationOutcome> start() {
113         // allocate a controller for the entire operation
114         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
115
116         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
117         if (preproc == null) {
118             // no preprocessor required - just start the operation
119             return startOperationAttempt(controller, 1);
120         }
121
122         /*
123          * Do preprocessor first and then, if successful, start the operation. Note:
124          * operations create their own outcome, ignoring the outcome from any previous
125          * steps.
126          *
127          * Wrap the preprocessor to ensure "stop" is propagated to it.
128          */
129         // @formatter:off
130         controller.wrap(preproc)
131                         .exceptionally(fromException("preprocessor of operation"))
132                         .thenCompose(handlePreprocessorFailure(controller))
133                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
134                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
135         // @formatter:on
136
137         return controller;
138     }
139
140     /**
141      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
142      * invokes the call-backs, marks the controller complete, and returns an incomplete
143      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
144      * received.
145      * <p/>
146      * Assumes that no callbacks have been invoked yet.
147      *
148      * @param controller pipeline controller
149      * @return a function that checks the outcome status and continues, if successful, or
150      *         indicates a failure otherwise
151      */
152     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
153                     PipelineControllerFuture<OperationOutcome> controller) {
154
155         return outcome -> {
156
157             if (outcome != null && isSuccess(outcome)) {
158                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
159                 return CompletableFuture.completedFuture(outcome);
160             }
161
162             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
163
164             final Executor executor = params.getExecutor();
165             final CallbackManager callbacks = new CallbackManager();
166
167             // propagate "stop" to the callbacks
168             controller.add(callbacks);
169
170             final OperationOutcome outcome2 = params.makeOutcome();
171
172             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
173
174             outcome2.setResult(PolicyResult.FAILURE_GUARD);
175             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
176
177             // @formatter:off
178             CompletableFuture.completedFuture(outcome2)
179                             .whenCompleteAsync(callbackStarted(callbacks), executor)
180                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
181                             .whenCompleteAsync(controller.delayedComplete(), executor);
182             // @formatter:on
183
184             return new CompletableFuture<>();
185         };
186     }
187
188     /**
189      * Invokes the operation's preprocessor step(s) as a "future". This method simply
190      * invokes {@link #startGuardAsync()}.
191      * <p/>
192      * This method assumes the following:
193      * <ul>
194      * <li>the operator is alive</li>
195      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
196      * </ul>
197      *
198      * @return a function that will start the preprocessor and returns its outcome, or
199      *         {@code null} if this operation needs no preprocessor
200      */
201     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
202         return startGuardAsync();
203     }
204
205     /**
206      * Invokes the operation's guard step(s) as a "future". This method simply returns
207      * {@code null}.
208      * <p/>
209      * This method assumes the following:
210      * <ul>
211      * <li>the operator is alive</li>
212      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
213      * </ul>
214      *
215      * @return a function that will start the guard checks and returns its outcome, or
216      *         {@code null} if this operation has no guard
217      */
218     protected CompletableFuture<OperationOutcome> startGuardAsync() {
219         return null;
220     }
221
222     /**
223      * Starts the operation attempt, with no preprocessor. When all retries complete, it
224      * will complete the controller.
225      *
226      * @param controller controller for all operation attempts
227      * @param attempt attempt number, typically starting with 1
228      * @return a future that will return the final result of all attempts
229      */
230     private CompletableFuture<OperationOutcome> startOperationAttempt(
231                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
232
233         // propagate "stop" to the operation attempt
234         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
235                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
236
237         return controller;
238     }
239
240     /**
241      * Starts the operation attempt, without doing any retries.
242      *
243      * @param params operation parameters
244      * @param attempt attempt number, typically starting with 1
245      * @return a future that will return the result of a single operation attempt
246      */
247     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
248
249         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
250
251         final Executor executor = params.getExecutor();
252         final OperationOutcome outcome = params.makeOutcome();
253         final CallbackManager callbacks = new CallbackManager();
254
255         // this operation attempt gets its own controller
256         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
257
258         // propagate "stop" to the callbacks
259         controller.add(callbacks);
260
261         // @formatter:off
262         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
263                         .whenCompleteAsync(callbackStarted(callbacks), executor)
264                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
265         // @formatter:on
266
267         // handle timeouts, if specified
268         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
269         if (timeoutMillis > 0) {
270             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
271             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
272         }
273
274         /*
275          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
276          * before callbackCompleted() is invoked.
277          *
278          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
279          * the pipeline as the last step anyway.
280          */
281
282         // @formatter:off
283         future.exceptionally(fromException("operation"))
284                     .thenApply(setRetryFlag(attempt))
285                     .whenCompleteAsync(callbackStarted(callbacks), executor)
286                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
287                     .whenCompleteAsync(controller.delayedComplete(), executor);
288         // @formatter:on
289
290         return controller;
291     }
292
293     /**
294      * Determines if the outcome was successful.
295      *
296      * @param outcome outcome to examine
297      * @return {@code true} if the outcome was successful
298      */
299     protected boolean isSuccess(OperationOutcome outcome) {
300         return (outcome.getResult() == PolicyResult.SUCCESS);
301     }
302
303     /**
304      * Determines if the outcome was a failure for this operator.
305      *
306      * @param outcome outcome to examine, or {@code null}
307      * @return {@code true} if the outcome is not {@code null} and was a failure
308      *         <i>and</i> was associated with this operator, {@code false} otherwise
309      */
310     protected boolean isActorFailed(OperationOutcome outcome) {
311         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
312     }
313
314     /**
315      * Determines if the given outcome is for this operation.
316      *
317      * @param outcome outcome to examine
318      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
319      */
320     protected boolean isSameOperation(OperationOutcome outcome) {
321         return OperationOutcome.isFor(outcome, getActorName(), getName());
322     }
323
324     /**
325      * Invokes the operation as a "future". This method simply invokes
326      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
327      * returning the result via a "future".
328      * <p/>
329      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
330      * the executor in the "params", as that may bring the background thread pool to a
331      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
332      * instead.
333      * <p/>
334      * This method assumes the following:
335      * <ul>
336      * <li>the operator is alive</li>
337      * <li>verifyRunning() has been invoked</li>
338      * <li>callbackStarted() has been invoked</li>
339      * <li>the invoker will perform appropriate timeout checks</li>
340      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
341      * </ul>
342      *
343      * @param attempt attempt number, typically starting with 1
344      * @return a function that will start the operation and return its result when
345      *         complete
346      */
347     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
348
349         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
350     }
351
352     /**
353      * Low-level method that performs the operation. This can make the same assumptions
354      * that are made by {@link #doOperationAsFuture()}. This particular method simply
355      * throws an {@link UnsupportedOperationException}.
356      *
357      * @param attempt attempt number, typically starting with 1
358      * @param operation the operation being performed
359      * @return the outcome of the operation
360      */
361     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
362
363         throw new UnsupportedOperationException("start operation " + getFullName());
364     }
365
366     /**
367      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
368      * FAILURE, assuming the policy specifies retries and the retry count has been
369      * exhausted.
370      *
371      * @param attempt latest attempt number, starting with 1
372      * @return a function to get the next future to execute
373      */
374     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
375
376         return operation -> {
377             if (operation != null && !isActorFailed(operation)) {
378                 /*
379                  * wrong type or wrong operation - just leave it as is. No need to log
380                  * anything here, as retryOnFailure() will log a message
381                  */
382                 return operation;
383             }
384
385             // get a non-null operation
386             OperationOutcome oper2;
387             if (operation != null) {
388                 oper2 = operation;
389             } else {
390                 oper2 = params.makeOutcome();
391                 oper2.setResult(PolicyResult.FAILURE);
392             }
393
394             int retry = getRetry(params.getRetry());
395             if (retry > 0 && attempt > retry) {
396                 /*
397                  * retries were specified and we've already tried them all - change to
398                  * FAILURE_RETRIES
399                  */
400                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
401                 oper2.setResult(PolicyResult.FAILURE_RETRIES);
402             }
403
404             return oper2;
405         };
406     }
407
408     /**
409      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
410      * was previously invoked, and thus that the "operation" is not {@code null}.
411      *
412      * @param controller controller for all of the retries
413      * @param attempt latest attempt number, starting with 1
414      * @return a function to get the next future to execute
415      */
416     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
417                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
418
419         return operation -> {
420             if (!isActorFailed(operation)) {
421                 // wrong type or wrong operation - just leave it as is
422                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
423                 controller.complete(operation);
424                 return new CompletableFuture<>();
425             }
426
427             if (getRetry(params.getRetry()) <= 0) {
428                 // no retries - already marked as FAILURE, so just return it
429                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
430                 controller.complete(operation);
431                 return new CompletableFuture<>();
432             }
433
434             /*
435              * Retry the operation.
436              */
437             long waitMs = getRetryWaitMs();
438             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
439
440             return sleep(waitMs, TimeUnit.MILLISECONDS)
441                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
442         };
443     }
444
445     /**
446      * Convenience method that starts a sleep(), running via a future.
447      *
448      * @param sleepTime time to sleep
449      * @param unit time unit
450      * @return a future that will complete when the sleep completes
451      */
452     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
453         if (sleepTime <= 0) {
454             return CompletableFuture.completedFuture(null);
455         }
456
457         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
458     }
459
460     /**
461      * Converts an exception into an operation outcome, returning a copy of the outcome to
462      * prevent background jobs from changing it.
463      *
464      * @param type type of item throwing the exception
465      * @return a function that will convert an exception into an operation outcome
466      */
467     private Function<Throwable, OperationOutcome> fromException(String type) {
468
469         return thrown -> {
470             OperationOutcome outcome = params.makeOutcome();
471
472             logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
473                             params.getRequestId(), thrown);
474
475             return setOutcome(outcome, thrown);
476         };
477     }
478
479     /**
480      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
481      * any outstanding futures when one completes.
482      *
483      * @param futureMakers function to make a future. If the function returns
484      *        {@code null}, then no future is created for that function. On the other
485      *        hand, if the function throws an exception, then the previously created
486      *        functions are canceled and the exception is re-thrown
487      * @return a future to cancel or await an outcome, or {@code null} if no futures were
488      *         created. If this future is canceled, then all of the futures will be
489      *         canceled
490      */
491     protected CompletableFuture<OperationOutcome> anyOf(
492                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
493
494         return anyOf(Arrays.asList(futureMakers));
495     }
496
497     /**
498      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
499      * any outstanding futures when one completes.
500      *
501      * @param futureMakers function to make a future. If the function returns
502      *        {@code null}, then no future is created for that function. On the other
503      *        hand, if the function throws an exception, then the previously created
504      *        functions are canceled and the exception is re-thrown
505      * @return a future to cancel or await an outcome, or {@code null} if no futures were
506      *         created. If this future is canceled, then all of the futures will be
507      *         canceled. Similarly, when this future completes, any incomplete futures
508      *         will be canceled
509      */
510     protected CompletableFuture<OperationOutcome> anyOf(
511                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
512
513         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
514
515         CompletableFuture<OperationOutcome>[] futures =
516                         attachFutures(controller, futureMakers, UnaryOperator.identity());
517
518         if (futures.length == 0) {
519             // no futures were started
520             return null;
521         }
522
523         if (futures.length == 1) {
524             return futures[0];
525         }
526
527         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
528                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
529
530         return controller;
531     }
532
533     /**
534      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
535      *
536      * @param futureMakers function to make a future. If the function returns
537      *        {@code null}, then no future is created for that function. On the other
538      *        hand, if the function throws an exception, then the previously created
539      *        functions are canceled and the exception is re-thrown
540      * @return a future to cancel or await an outcome, or {@code null} if no futures were
541      *         created. If this future is canceled, then all of the futures will be
542      *         canceled
543      */
544     protected CompletableFuture<OperationOutcome> allOf(
545                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
546
547         return allOf(Arrays.asList(futureMakers));
548     }
549
550     /**
551      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
552      *
553      * @param futureMakers function to make a future. If the function returns
554      *        {@code null}, then no future is created for that function. On the other
555      *        hand, if the function throws an exception, then the previously created
556      *        functions are canceled and the exception is re-thrown
557      * @return a future to cancel or await an outcome, or {@code null} if no futures were
558      *         created. If this future is canceled, then all of the futures will be
559      *         canceled. Similarly, when this future completes, any incomplete futures
560      *         will be canceled
561      */
562     protected CompletableFuture<OperationOutcome> allOf(
563                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
564         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
565
566         Queue<OperationOutcome> outcomes = new LinkedList<>();
567
568         CompletableFuture<OperationOutcome>[] futures =
569                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
570                             synchronized (outcomes) {
571                                 outcomes.add(outcome);
572                             }
573                             return outcome;
574                         }));
575
576         if (futures.length == 0) {
577             // no futures were started
578             return null;
579         }
580
581         if (futures.length == 1) {
582             return futures[0];
583         }
584
585         // @formatter:off
586         CompletableFuture.allOf(futures)
587                         .thenApply(unused -> combineOutcomes(outcomes))
588                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
589         // @formatter:on
590
591         return controller;
592     }
593
594     /**
595      * Invokes the functions to create the futures and attaches them to the controller.
596      *
597      * @param controller master controller for all of the futures
598      * @param futureMakers futures to be attached to the controller
599      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
600      *        Returns the adorned future
601      * @return an array of futures, possibly zero-length. If the array is of size one,
602      *         then that one item should be returned instead of the controller
603      */
604     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
605                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
606                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
607
608         if (futureMakers.isEmpty()) {
609             @SuppressWarnings("unchecked")
610             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
611             return result;
612         }
613
614         // the last, unadorned future that is created
615         CompletableFuture<OperationOutcome> lastFuture = null;
616
617         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
618
619         // make each future
620         for (var maker : futureMakers) {
621             try {
622                 CompletableFuture<OperationOutcome> future = maker.get();
623                 if (future == null) {
624                     continue;
625                 }
626
627                 // propagate "stop" to the future
628                 controller.add(future);
629
630                 futures.add(adorn.apply(future));
631
632                 lastFuture = future;
633
634             } catch (RuntimeException e) {
635                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
636                 controller.cancel(false);
637                 throw e;
638             }
639         }
640
641         @SuppressWarnings("unchecked")
642         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
643
644         if (result.length == 1) {
645             // special case - return the unadorned future
646             result[0] = lastFuture;
647             return result;
648         }
649
650         return futures.toArray(result);
651     }
652
653     /**
654      * Combines the outcomes from a set of tasks.
655      *
656      * @param outcomes outcomes to be examined
657      * @return the combined outcome
658      */
659     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
660
661         // identify the outcome with the highest priority
662         OperationOutcome outcome = outcomes.remove();
663         int priority = detmPriority(outcome);
664
665         for (OperationOutcome outcome2 : outcomes) {
666             int priority2 = detmPriority(outcome2);
667
668             if (priority2 > priority) {
669                 outcome = outcome2;
670                 priority = priority2;
671             }
672         }
673
674         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
675                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
676
677         return outcome;
678     }
679
680     /**
681      * Determines the priority of an outcome based on its result.
682      *
683      * @param outcome outcome to examine, or {@code null}
684      * @return the outcome's priority
685      */
686     protected int detmPriority(OperationOutcome outcome) {
687         if (outcome == null || outcome.getResult() == null) {
688             return 1;
689         }
690
691         switch (outcome.getResult()) {
692             case SUCCESS:
693                 return 0;
694
695             case FAILURE_GUARD:
696                 return 2;
697
698             case FAILURE_RETRIES:
699                 return 3;
700
701             case FAILURE:
702                 return 4;
703
704             case FAILURE_TIMEOUT:
705                 return 5;
706
707             case FAILURE_EXCEPTION:
708             default:
709                 return 6;
710         }
711     }
712
713     /**
714      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
715      * not created until the previous task completes. The pipeline returns the outcome of
716      * the last task executed.
717      *
718      * @param futureMakers functions to make the futures
719      * @return a future to cancel the sequence or await the outcome
720      */
721     protected CompletableFuture<OperationOutcome> sequence(
722                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
723
724         return sequence(Arrays.asList(futureMakers));
725     }
726
727     /**
728      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
729      * not created until the previous task completes. The pipeline returns the outcome of
730      * the last task executed.
731      *
732      * @param futureMakers functions to make the futures
733      * @return a future to cancel the sequence or await the outcome, or {@code null} if
734      *         there were no tasks to perform
735      */
736     protected CompletableFuture<OperationOutcome> sequence(
737                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
738
739         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
740
741         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
742         if (nextTask == null) {
743             // no tasks
744             return null;
745         }
746
747         if (queue.isEmpty()) {
748             // only one task - just return it rather than wrapping it in a controller
749             return nextTask;
750         }
751
752         /*
753          * multiple tasks - need a controller to stop whichever task is currently
754          * executing
755          */
756         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
757         final Executor executor = params.getExecutor();
758
759         // @formatter:off
760         controller.wrap(nextTask)
761                     .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
762                     .whenCompleteAsync(controller.delayedComplete(), executor);
763         // @formatter:on
764
765         return controller;
766     }
767
768     /**
769      * Executes the next task in the queue, if the previous outcome was successful.
770      *
771      * @param controller pipeline controller
772      * @param taskQueue queue of tasks to be performed
773      * @return a future to execute the remaining tasks, or the current outcome, if it's a
774      *         failure, or if there are no more tasks
775      */
776     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
777                     PipelineControllerFuture<OperationOutcome> controller,
778                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
779
780         return outcome -> {
781             if (!isSuccess(outcome)) {
782                 // return the failure
783                 return CompletableFuture.completedFuture(outcome);
784             }
785
786             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
787             if (nextTask == null) {
788                 // no tasks - just return the success
789                 return CompletableFuture.completedFuture(outcome);
790             }
791
792             // @formatter:off
793             return controller
794                         .wrap(nextTask)
795                         .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
796             // @formatter:on
797         };
798     }
799
800     /**
801      * Gets the next task from the queue, skipping those that are {@code null}.
802      *
803      * @param taskQueue task queue
804      * @return the next task, or {@code null} if the queue is now empty
805      */
806     private CompletableFuture<OperationOutcome> getNextTask(
807                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
808
809         Supplier<CompletableFuture<OperationOutcome>> maker;
810
811         while ((maker = taskQueue.poll()) != null) {
812             CompletableFuture<OperationOutcome> future = maker.get();
813             if (future != null) {
814                 return future;
815             }
816         }
817
818         return null;
819     }
820
821     /**
822      * Sets the start time of the operation and invokes the callback to indicate that the
823      * operation has started. Does nothing if the pipeline has been stopped.
824      * <p/>
825      * This assumes that the "outcome" is not {@code null}.
826      *
827      * @param callbacks used to determine if the start callback can be invoked
828      * @return a function that sets the start time and invokes the callback
829      */
830     private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
831
832         return (outcome, thrown) -> {
833
834             if (callbacks.canStart()) {
835                 // haven't invoked "start" callback yet
836                 outcome.setStart(callbacks.getStartTime());
837                 outcome.setEnd(null);
838                 params.callbackStarted(outcome);
839             }
840         };
841     }
842
843     /**
844      * Sets the end time of the operation and invokes the callback to indicate that the
845      * operation has completed. Does nothing if the pipeline has been stopped.
846      * <p/>
847      * This assumes that the "outcome" is not {@code null}.
848      * <p/>
849      * Note: the start time must be a reference rather than a plain value, because it's
850      * value must be gotten on-demand, when the returned function is executed at a later
851      * time.
852      *
853      * @param callbacks used to determine if the end callback can be invoked
854      * @return a function that sets the end time and invokes the callback
855      */
856     private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
857
858         return (outcome, thrown) -> {
859
860             if (callbacks.canEnd()) {
861                 outcome.setStart(callbacks.getStartTime());
862                 outcome.setEnd(callbacks.getEndTime());
863                 params.callbackCompleted(outcome);
864             }
865         };
866     }
867
868     /**
869      * Sets an operation's outcome and message, based on a throwable.
870      *
871      * @param operation operation to be updated
872      * @return the updated operation
873      */
874     protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
875         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
876         return setOutcome(operation, result);
877     }
878
879     /**
880      * Sets an operation's outcome and default message based on the result.
881      *
882      * @param operation operation to be updated
883      * @param result result of the operation
884      * @return the updated operation
885      */
886     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
887         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
888         operation.setResult(result);
889         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
890                         : ControlLoopOperation.FAILED_MSG);
891
892         return operation;
893     }
894
895     /**
896      * Determines if a throwable is due to a timeout.
897      *
898      * @param thrown throwable of interest
899      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
900      */
901     protected boolean isTimeout(Throwable thrown) {
902         if (thrown instanceof CompletionException) {
903             thrown = thrown.getCause();
904         }
905
906         return (thrown instanceof TimeoutException);
907     }
908
909     /**
910      * Logs a response. If the response is not of type, String, then it attempts to
911      * pretty-print it into JSON before logging.
912      *
913      * @param direction IN or OUT
914      * @param infra communication infrastructure on which it was published
915      * @param source source name (e.g., the URL or Topic name)
916      * @param response response to be logged
917      * @return the JSON text that was logged
918      */
919     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
920         String json;
921         try {
922             if (response == null) {
923                 json = null;
924             } else if (response instanceof String) {
925                 json = response.toString();
926             } else {
927                 json = makeCoder().encode(response, true);
928             }
929
930         } catch (CoderException e) {
931             String type = (direction == EventType.IN ? "response" : "request");
932             logger.warn("cannot pretty-print {}", type, e);
933             json = response.toString();
934         }
935
936         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
937
938         return json;
939     }
940
941     // these may be overridden by subclasses or junit tests
942
943     /**
944      * Gets the retry count.
945      *
946      * @param retry retry, extracted from the parameters, or {@code null}
947      * @return the number of retries, or {@code 0} if no retries were specified
948      */
949     protected int getRetry(Integer retry) {
950         return (retry == null ? 0 : retry);
951     }
952
953     /**
954      * Gets the retry wait, in milliseconds.
955      *
956      * @return the retry wait, in milliseconds
957      */
958     protected long getRetryWaitMs() {
959         return DEFAULT_RETRY_WAIT_MS;
960     }
961
962     /**
963      * Gets the operation timeout.
964      *
965      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
966      *        {@code null}
967      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
968      *         specified
969      */
970     protected long getTimeoutMs(Integer timeoutSec) {
971         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
972     }
973
974     // these may be overridden by junit tests
975
976     protected Coder makeCoder() {
977         return coder;
978     }
979 }