3281ddb5fd22b1db24bdcb6b409274f1ec90324e
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.UUID;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.CompletionException;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.function.BiConsumer;
39 import java.util.function.Function;
40 import java.util.function.Supplier;
41 import java.util.function.UnaryOperator;
42 import lombok.AccessLevel;
43 import lombok.Getter;
44 import lombok.Setter;
45 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
46 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
48 import org.onap.policy.common.utils.coder.Coder;
49 import org.onap.policy.common.utils.coder.CoderException;
50 import org.onap.policy.common.utils.coder.StandardCoder;
51 import org.onap.policy.controlloop.ControlLoopOperation;
52 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
53 import org.onap.policy.controlloop.actorserviceprovider.Operation;
54 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
59 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * Partial implementation of an operator. In general, it's preferable that subclasses
65  * would override {@link #startOperationAsync(int, OperationOutcome)
66  * startOperationAsync()}. However, if that proves to be too difficult, then they can
67  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}.
68  * <p/>
69  * The futures returned by the methods within this class can be canceled, and will
70  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
71  * returned by overridden methods will do the same. Of course, if a class overrides
72  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
73  * be done to cancel that particular operation.
74  * <p/>
75  * In general tasks in a pipeline are executed by the same thread. However, the following
76  * should always be executed via the executor specified in "params":
77  * <ul>
78  * <li>start callback</li>
79  * <li>completion callback</li>
80  * <li>controller completion (i.e., delayedComplete())</li>
81  * </ul>
82  */
83 public abstract class OperationPartial implements Operation {
84     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
85     private static final Coder coder = new StandardCoder();
86
87     public static final String GUARD_ACTOR_NAME = "GUARD";
88     public static final String GUARD_OPERATION_NAME = "Decision";
89     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
90
91     private final OperatorConfig config;
92
93     /**
94      * Operation parameters.
95      */
96     protected final ControlLoopOperationParams params;
97
98     @Getter
99     private final String fullName;
100
101     @Getter
102     @Setter(AccessLevel.PROTECTED)
103     private String subRequestId;
104
105     @Getter
106     private final List<String> propertyNames;
107
108     /**
109      * Values for the properties identified by {@link #getPropertyNames()}.
110      */
111     private final Map<String, Object> properties = new HashMap<>();
112
113
114     /**
115      * Constructs the object.
116      *
117      * @param params operation parameters
118      * @param config configuration for this operation
119      * @param propertyNames names of properties required by this operation
120      */
121     protected OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
122         this.params = params;
123         this.config = config;
124         this.fullName = params.getActor() + "." + params.getOperation();
125         this.propertyNames = propertyNames;
126     }
127
128     public Executor getBlockingExecutor() {
129         return config.getBlockingExecutor();
130     }
131
132     @Override
133     public String getActorName() {
134         return params.getActor();
135     }
136
137     @Override
138     public String getName() {
139         return params.getOperation();
140     }
141
142     @Override
143     public boolean containsProperty(String name) {
144         return properties.containsKey(name);
145     }
146
147     @Override
148     public void setProperty(String name, Object value) {
149         logger.info("{}: set property {}={}", getFullName(), name, value);
150         properties.put(name, value);
151     }
152
153     @SuppressWarnings("unchecked")
154     @Override
155     public <T> T getProperty(String name) {
156         return (T) properties.get(name);
157     }
158
159     /**
160      * Gets a property value, throwing an exception if it's missing.
161      *
162      * @param name property name
163      * @param propertyType property type, used in an error message if the property value
164      *        is {@code null}
165      * @return the property value
166      */
167     @SuppressWarnings("unchecked")
168     public <T> T getRequiredProperty(String name, String propertyType) {
169         T value = (T) properties.get(name);
170         if (value == null) {
171             throw new IllegalStateException("missing " + propertyType);
172         }
173
174         return value;
175     }
176
177     @Override
178     public CompletableFuture<OperationOutcome> start() {
179         // allocate a controller for the entire operation
180         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
181
182         // start attempt #1
183         return startOperationAttempt(controller, 1);
184     }
185
186     /**
187      * Starts the operation attempt. When all retries complete, it will complete the
188      * controller.
189      *
190      * @param controller controller for all operation attempts
191      * @param attempt attempt number, typically starting with 1
192      * @return a future that will return the final result of all attempts
193      */
194     private CompletableFuture<OperationOutcome> startOperationAttempt(
195                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
196
197         generateSubRequestId(attempt);
198
199         // propagate "stop" to the operation attempt
200         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
201                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
202
203         return controller;
204     }
205
206     /**
207      * Generates and sets {@link #subRequestId} to a new subrequest ID.
208      *
209      * @param attempt attempt number, typically starting with 1
210      */
211     public void generateSubRequestId(int attempt) {
212         // Note: this should be "protected", but that makes junits much messier
213
214         setSubRequestId(UUID.randomUUID().toString());
215     }
216
217     /**
218      * Starts the operation attempt, without doing any retries.
219      *
220      * @param attempt attempt number, typically starting with 1
221      * @return a future that will return the result of a single operation attempt
222      */
223     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
224
225         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
226
227         final var executor = params.getExecutor();
228         final var outcome = makeOutcome();
229         final var callbacks = new CallbackManager();
230
231         // this operation attempt gets its own controller
232         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
233
234         // propagate "stop" to the callbacks
235         controller.add(callbacks);
236
237         // @formatter:off
238         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
239                         .whenCompleteAsync(callbackStarted(callbacks), executor)
240                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
241         // @formatter:on
242
243         // handle timeouts, if specified
244         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
245         if (timeoutMillis > 0) {
246             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
247             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
248         }
249
250         /*
251          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
252          * before callbackCompleted() is invoked.
253          *
254          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
255          * the pipeline as the last step anyway.
256          */
257
258         // @formatter:off
259         future.exceptionally(fromException("operation"))
260                     .thenApply(setRetryFlag(attempt))
261                     .whenCompleteAsync(callbackStarted(callbacks), executor)
262                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
263                     .whenCompleteAsync(controller.delayedComplete(), executor);
264         // @formatter:on
265
266         return controller;
267     }
268
269     /**
270      * Determines if the outcome was successful.
271      *
272      * @param outcome outcome to examine
273      * @return {@code true} if the outcome was successful
274      */
275     protected boolean isSuccess(OperationOutcome outcome) {
276         return (outcome != null && outcome.getResult() == OperationResult.SUCCESS);
277     }
278
279     /**
280      * Determines if the outcome was a failure for this operator.
281      *
282      * @param outcome outcome to examine, or {@code null}
283      * @return {@code true} if the outcome is not {@code null} and was a failure
284      *         <i>and</i> was associated with this operator, {@code false} otherwise
285      */
286     protected boolean isActorFailed(OperationOutcome outcome) {
287         return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE);
288     }
289
290     /**
291      * Determines if the given outcome is for this operation.
292      *
293      * @param outcome outcome to examine
294      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
295      */
296     protected boolean isSameOperation(OperationOutcome outcome) {
297         return OperationOutcome.isFor(outcome, getActorName(), getName());
298     }
299
300     /**
301      * Invokes the operation as a "future". This method simply invokes
302      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
303      * returning the result via a "future".
304      * <p/>
305      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
306      * the executor in the "params", as that may bring the background thread pool to a
307      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
308      * instead.
309      * <p/>
310      * This method assumes the following:
311      * <ul>
312      * <li>the operator is alive</li>
313      * <li>verifyRunning() has been invoked</li>
314      * <li>callbackStarted() has been invoked</li>
315      * <li>the invoker will perform appropriate timeout checks</li>
316      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
317      * </ul>
318      *
319      * @param attempt attempt number, typically starting with 1
320      * @return a function that will start the operation and return its result when
321      *         complete
322      */
323     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
324
325         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
326     }
327
328     /**
329      * Low-level method that performs the operation. This can make the same assumptions
330      * that are made by {@link #doOperationAsFuture()}. This particular method simply
331      * throws an {@link UnsupportedOperationException}.
332      *
333      * @param attempt attempt number, typically starting with 1
334      * @param operation the operation being performed
335      * @return the outcome of the operation
336      */
337     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
338
339         throw new UnsupportedOperationException("start operation " + getFullName());
340     }
341
342     /**
343      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
344      * FAILURE, assuming the policy specifies retries and the retry count has been
345      * exhausted.
346      *
347      * @param attempt latest attempt number, starting with 1
348      * @return a function to get the next future to execute
349      */
350     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
351
352         return origOutcome -> {
353             // ensure we have a non-null outcome
354             OperationOutcome outcome;
355             if (origOutcome != null) {
356                 outcome = origOutcome;
357             } else {
358                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
359                 outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE);
360             }
361
362             // ensure correct actor/operation
363             outcome.setActor(getActorName());
364             outcome.setOperation(getName());
365
366             // determine if we should retry, based on the result
367             if (outcome.getResult() != OperationResult.FAILURE) {
368                 // do not retry success or other failure types (e.g., exception)
369                 outcome.setFinalOutcome(true);
370                 return outcome;
371             }
372
373             int retry = getRetry(params.getRetry());
374             if (retry <= 0) {
375                 // no retries were specified
376                 outcome.setFinalOutcome(true);
377
378             } else if (attempt <= retry) {
379                 // have more retries - not the final outcome
380                 outcome.setFinalOutcome(false);
381
382             } else {
383                 /*
384                  * retries were specified and we've already tried them all - change to
385                  * FAILURE_RETRIES
386                  */
387                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
388                 outcome.setResult(OperationResult.FAILURE_RETRIES);
389                 outcome.setFinalOutcome(true);
390             }
391
392             return outcome;
393         };
394     }
395
396     /**
397      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
398      * was previously invoked, and thus that the "operation" is not {@code null}.
399      *
400      * @param controller controller for all of the retries
401      * @param attempt latest attempt number, starting with 1
402      * @return a function to get the next future to execute
403      */
404     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
405                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
406
407         return operation -> {
408             if (!isActorFailed(operation)) {
409                 // wrong type or wrong operation - just leave it as is
410                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
411                 controller.complete(operation);
412                 return new CompletableFuture<>();
413             }
414
415             if (getRetry(params.getRetry()) <= 0) {
416                 // no retries - already marked as FAILURE, so just return it
417                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
418                 controller.complete(operation);
419                 return new CompletableFuture<>();
420             }
421
422             /*
423              * Retry the operation.
424              */
425             long waitMs = getRetryWaitMs();
426             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
427
428             return sleep(waitMs, TimeUnit.MILLISECONDS)
429                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
430         };
431     }
432
433     /**
434      * Convenience method that starts a sleep(), running via a future.
435      *
436      * @param sleepTime time to sleep
437      * @param unit time unit
438      * @return a future that will complete when the sleep completes
439      */
440     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
441         if (sleepTime <= 0) {
442             return CompletableFuture.completedFuture(null);
443         }
444
445         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
446     }
447
448     /**
449      * Converts an exception into an operation outcome, returning a copy of the outcome to
450      * prevent background jobs from changing it.
451      *
452      * @param type type of item throwing the exception
453      * @return a function that will convert an exception into an operation outcome
454      */
455     private Function<Throwable, OperationOutcome> fromException(String type) {
456
457         return thrown -> {
458             OperationOutcome outcome = makeOutcome();
459
460             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
461                 // do not include exception in the message, as it just clutters the log
462                 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
463                                 params.getRequestId());
464             } else {
465                 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
466                                 params.getRequestId(), thrown);
467             }
468
469             return setOutcome(outcome, thrown);
470         };
471     }
472
473     /**
474      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
475      * any outstanding futures when one completes.
476      *
477      * @param futureMakers function to make a future. If the function returns
478      *        {@code null}, then no future is created for that function. On the other
479      *        hand, if the function throws an exception, then the previously created
480      *        functions are canceled and the exception is re-thrown
481      * @return a future to cancel or await an outcome, or {@code null} if no futures were
482      *         created. If this future is canceled, then all of the futures will be
483      *         canceled
484      */
485     public CompletableFuture<OperationOutcome> anyOf(
486                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
487
488         return anyOf(Arrays.asList(futureMakers));
489     }
490
491     /**
492      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
493      * any outstanding futures when one completes.
494      *
495      * @param futureMakers function to make a future. If the function returns
496      *        {@code null}, then no future is created for that function. On the other
497      *        hand, if the function throws an exception, then the previously created
498      *        functions are canceled and the exception is re-thrown
499      * @return a future to cancel or await an outcome, or {@code null} if no futures were
500      *         created. If this future is canceled, then all of the futures will be
501      *         canceled. Similarly, when this future completes, any incomplete futures
502      *         will be canceled
503      */
504     public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
505
506         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
507
508         CompletableFuture<OperationOutcome>[] futures =
509                         attachFutures(controller, futureMakers, UnaryOperator.identity());
510
511         if (futures.length == 0) {
512             // no futures were started
513             return null;
514         }
515
516         if (futures.length == 1) {
517             return futures[0];
518         }
519
520         CompletableFuture.anyOf(futures).thenApply(OperationOutcome.class::cast)
521                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
522
523         return controller;
524     }
525
526     /**
527      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
528      *
529      * @param futureMakers function to make a future. If the function returns
530      *        {@code null}, then no future is created for that function. On the other
531      *        hand, if the function throws an exception, then the previously created
532      *        functions are canceled and the exception is re-thrown
533      * @return a future to cancel or await an outcome, or {@code null} if no futures were
534      *         created. If this future is canceled, then all of the futures will be
535      *         canceled
536      */
537     public CompletableFuture<OperationOutcome> allOf(
538                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
539
540         return allOf(Arrays.asList(futureMakers));
541     }
542
543     /**
544      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
545      *
546      * @param futureMakers function to make a future. If the function returns
547      *        {@code null}, then no future is created for that function. On the other
548      *        hand, if the function throws an exception, then the previously created
549      *        functions are canceled and the exception is re-thrown
550      * @return a future to cancel or await an outcome, or {@code null} if no futures were
551      *         created. If this future is canceled, then all of the futures will be
552      *         canceled. Similarly, when this future completes, any incomplete futures
553      *         will be canceled
554      */
555     public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
556         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
557
558         Queue<OperationOutcome> outcomes = new LinkedList<>();
559
560         CompletableFuture<OperationOutcome>[] futures =
561                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
562                             synchronized (outcomes) {
563                                 outcomes.add(outcome);
564                             }
565                             return outcome;
566                         }));
567
568         if (futures.length == 0) {
569             // no futures were started
570             return null;
571         }
572
573         if (futures.length == 1) {
574             return futures[0];
575         }
576
577         // @formatter:off
578         CompletableFuture.allOf(futures)
579                         .thenApply(unused -> combineOutcomes(outcomes))
580                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
581         // @formatter:on
582
583         return controller;
584     }
585
586     /**
587      * Invokes the functions to create the futures and attaches them to the controller.
588      *
589      * @param controller master controller for all of the futures
590      * @param futureMakers futures to be attached to the controller
591      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
592      *        Returns the adorned future
593      * @return an array of futures, possibly zero-length. If the array is of size one,
594      *         then that one item should be returned instead of the controller
595      */
596     @SuppressWarnings("unchecked")
597     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
598                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
599                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
600
601         if (futureMakers.isEmpty()) {
602             return new CompletableFuture[0];
603         }
604
605         // the last, unadorned future that is created
606         CompletableFuture<OperationOutcome> lastFuture = null;
607
608         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
609
610         // make each future
611         for (var maker : futureMakers) {
612             try {
613                 CompletableFuture<OperationOutcome> future = maker.get();
614                 if (future == null) {
615                     continue;
616                 }
617
618                 // propagate "stop" to the future
619                 controller.add(future);
620
621                 futures.add(adorn.apply(future));
622
623                 lastFuture = future;
624
625             } catch (RuntimeException e) {
626                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
627                 controller.cancel(false);
628                 throw e;
629             }
630         }
631
632         var result = new CompletableFuture[futures.size()];
633
634         if (result.length == 1) {
635             // special case - return the unadorned future
636             result[0] = lastFuture;
637             return result;
638         }
639
640         return futures.toArray(result);
641     }
642
643     /**
644      * Combines the outcomes from a set of tasks.
645      *
646      * @param outcomes outcomes to be examined
647      * @return the combined outcome
648      */
649     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
650
651         // identify the outcome with the highest priority
652         OperationOutcome outcome = outcomes.remove();
653         int priority = detmPriority(outcome);
654
655         for (OperationOutcome outcome2 : outcomes) {
656             int priority2 = detmPriority(outcome2);
657
658             if (priority2 > priority) {
659                 outcome = outcome2;
660                 priority = priority2;
661             }
662         }
663
664         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
665                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
666
667         return outcome;
668     }
669
670     /**
671      * Determines the priority of an outcome based on its result.
672      *
673      * @param outcome outcome to examine, or {@code null}
674      * @return the outcome's priority
675      */
676     protected int detmPriority(OperationOutcome outcome) {
677         if (outcome == null || outcome.getResult() == null) {
678             return 1;
679         }
680
681         switch (outcome.getResult()) {
682             case SUCCESS:
683                 return 0;
684
685             case FAILURE_GUARD:
686                 return 2;
687
688             case FAILURE_RETRIES:
689                 return 3;
690
691             case FAILURE:
692                 return 4;
693
694             case FAILURE_TIMEOUT:
695                 return 5;
696
697             case FAILURE_EXCEPTION:
698             default:
699                 return 6;
700         }
701     }
702
703     /**
704      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
705      * not created until the previous task completes. The pipeline returns the outcome of
706      * the last task executed.
707      *
708      * @param futureMakers functions to make the futures
709      * @return a future to cancel the sequence or await the outcome
710      */
711     public CompletableFuture<OperationOutcome> sequence(
712                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
713
714         return sequence(Arrays.asList(futureMakers));
715     }
716
717     /**
718      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
719      * not created until the previous task completes. The pipeline returns the outcome of
720      * the last task executed.
721      *
722      * @param futureMakers functions to make the futures
723      * @return a future to cancel the sequence or await the outcome, or {@code null} if
724      *         there were no tasks to perform
725      */
726     public CompletableFuture<OperationOutcome> sequence(
727                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
728
729         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
730
731         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
732         if (nextTask == null) {
733             // no tasks
734             return null;
735         }
736
737         if (queue.isEmpty()) {
738             // only one task - just return it rather than wrapping it in a controller
739             return nextTask;
740         }
741
742         /*
743          * multiple tasks - need a controller to stop whichever task is currently
744          * executing
745          */
746         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
747         final var executor = params.getExecutor();
748
749         // @formatter:off
750         controller.wrap(nextTask)
751                     .thenCompose(nextTaskOnSuccess(controller, queue))
752                     .whenCompleteAsync(controller.delayedComplete(), executor);
753         // @formatter:on
754
755         return controller;
756     }
757
758     /**
759      * Executes the next task in the queue, if the previous outcome was successful.
760      *
761      * @param controller pipeline controller
762      * @param taskQueue queue of tasks to be performed
763      * @return a future to execute the remaining tasks, or the current outcome, if it's a
764      *         failure, or if there are no more tasks
765      */
766     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
767                     PipelineControllerFuture<OperationOutcome> controller,
768                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
769
770         return outcome -> {
771             if (!isSuccess(outcome)) {
772                 // return the failure
773                 return CompletableFuture.completedFuture(outcome);
774             }
775
776             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
777             if (nextTask == null) {
778                 // no tasks - just return the success
779                 return CompletableFuture.completedFuture(outcome);
780             }
781
782             // @formatter:off
783             return controller
784                         .wrap(nextTask)
785                         .thenCompose(nextTaskOnSuccess(controller, taskQueue));
786             // @formatter:on
787         };
788     }
789
790     /**
791      * Gets the next task from the queue, skipping those that are {@code null}.
792      *
793      * @param taskQueue task queue
794      * @return the next task, or {@code null} if the queue is now empty
795      */
796     private CompletableFuture<OperationOutcome> getNextTask(
797                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
798
799         Supplier<CompletableFuture<OperationOutcome>> maker;
800
801         while ((maker = taskQueue.poll()) != null) {
802             CompletableFuture<OperationOutcome> future = maker.get();
803             if (future != null) {
804                 return future;
805             }
806         }
807
808         return null;
809     }
810
811     /**
812      * Sets the start time of the operation and invokes the callback to indicate that the
813      * operation has started. Does nothing if the pipeline has been stopped.
814      * <p/>
815      * This assumes that the "outcome" is not {@code null}.
816      *
817      * @param callbacks used to determine if the start callback can be invoked
818      * @return a function that sets the start time and invokes the callback
819      */
820     protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
821
822         return (outcome, thrown) -> {
823
824             if (callbacks.canStart()) {
825                 outcome.setSubRequestId(getSubRequestId());
826                 outcome.setStart(callbacks.getStartTime());
827                 outcome.setEnd(null);
828
829                 // pass a copy to the callback
830                 var outcome2 = new OperationOutcome(outcome);
831                 outcome2.setFinalOutcome(false);
832                 params.callbackStarted(outcome2);
833             }
834         };
835     }
836
837     /**
838      * Sets the end time of the operation and invokes the callback to indicate that the
839      * operation has completed. Does nothing if the pipeline has been stopped.
840      * <p/>
841      * This assumes that the "outcome" is not {@code null}.
842      * <p/>
843      * Note: the start time must be a reference rather than a plain value, because it's
844      * value must be gotten on-demand, when the returned function is executed at a later
845      * time.
846      *
847      * @param callbacks used to determine if the end callback can be invoked
848      * @return a function that sets the end time and invokes the callback
849      */
850     protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
851
852         return (outcome, thrown) -> {
853             if (callbacks.canEnd()) {
854                 outcome.setSubRequestId(getSubRequestId());
855                 outcome.setStart(callbacks.getStartTime());
856                 outcome.setEnd(callbacks.getEndTime());
857
858                 // pass a copy to the callback
859                 params.callbackCompleted(new OperationOutcome(outcome));
860             }
861         };
862     }
863
864     /**
865      * Sets an operation's outcome and message, based on a throwable.
866      *
867      * @param operation operation to be updated
868      * @return the updated operation
869      */
870     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
871         OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT
872                 : OperationResult.FAILURE_EXCEPTION);
873         return setOutcome(operation, result);
874     }
875
876     /**
877      * Sets an operation's outcome and default message based on the result.
878      *
879      * @param operation operation to be updated
880      * @param result result of the operation
881      * @return the updated operation
882      */
883     public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) {
884         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
885         operation.setResult(result);
886         operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
887                         : ControlLoopOperation.FAILED_MSG);
888
889         return operation;
890     }
891
892     /**
893      * Makes an outcome, populating the "target" field with the contents of the target
894      * entity property.
895      *
896      * @return a new operation outcome
897      */
898     protected OperationOutcome makeOutcome() {
899         OperationOutcome outcome = params.makeOutcome();
900         outcome.setTarget(getProperty(OperationProperties.AAI_TARGET_ENTITY));
901         return outcome;
902     }
903
904     /**
905      * Determines if a throwable is due to a timeout.
906      *
907      * @param thrown throwable of interest
908      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
909      */
910     protected boolean isTimeout(Throwable thrown) {
911         if (thrown instanceof CompletionException) {
912             thrown = thrown.getCause();
913         }
914
915         return (thrown instanceof TimeoutException);
916     }
917
918     /**
919      * Logs a message. If the message is not of type, String, then it attempts to
920      * pretty-print it into JSON before logging.
921      *
922      * @param direction IN or OUT
923      * @param infra communication infrastructure on which it was published
924      * @param source source name (e.g., the URL or Topic name)
925      * @param message message to be logged
926      * @return the JSON text that was logged
927      */
928     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
929         String json;
930         try {
931             json = prettyPrint(message);
932
933         } catch (IllegalArgumentException e) {
934             String type = (direction == EventType.IN ? "response" : "request");
935             logger.warn("cannot pretty-print {}", type, e);
936             json = message.toString();
937         }
938
939         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
940
941         return json;
942     }
943
944     /**
945      * Converts a message to a "pretty-printed" String using the operation's normal
946      * serialization provider (i.e., it's <i>coder</i>).
947      *
948      * @param message response to be logged
949      * @return the JSON text that was logged
950      * @throws IllegalArgumentException if the message cannot be converted
951      */
952     public <T> String prettyPrint(T message) {
953         if (message == null) {
954             return null;
955         } else if (message instanceof String) {
956             return message.toString();
957         } else {
958             try {
959                 return getCoder().encode(message, true);
960             } catch (CoderException e) {
961                 throw new IllegalArgumentException("cannot encode message", e);
962             }
963         }
964     }
965
966     // these may be overridden by subclasses or junit tests
967
968     /**
969      * Gets the retry count.
970      *
971      * @param retry retry, extracted from the parameters, or {@code null}
972      * @return the number of retries, or {@code 0} if no retries were specified
973      */
974     protected int getRetry(Integer retry) {
975         return (retry == null ? 0 : retry);
976     }
977
978     /**
979      * Gets the retry wait, in milliseconds.
980      *
981      * @return the retry wait, in milliseconds
982      */
983     protected long getRetryWaitMs() {
984         return DEFAULT_RETRY_WAIT_MS;
985     }
986
987     /**
988      * Gets the operation timeout.
989      *
990      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
991      *        {@code null}
992      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
993      *         specified
994      */
995     protected long getTimeoutMs(Integer timeoutSec) {
996         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
997     }
998
999     // these may be overridden by junit tests
1000
1001     protected Coder getCoder() {
1002         return coder;
1003     }
1004 }