Upgrade to oparent 3.2.1
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.UUID;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.CompletionException;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.function.BiConsumer;
39 import java.util.function.Function;
40 import java.util.function.Supplier;
41 import java.util.function.UnaryOperator;
42 import lombok.AccessLevel;
43 import lombok.Getter;
44 import lombok.Setter;
45 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
46 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
48 import org.onap.policy.common.utils.coder.Coder;
49 import org.onap.policy.common.utils.coder.CoderException;
50 import org.onap.policy.common.utils.coder.StandardCoder;
51 import org.onap.policy.controlloop.ControlLoopOperation;
52 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
53 import org.onap.policy.controlloop.actorserviceprovider.Operation;
54 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
59 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * Partial implementation of an operator. In general, it's preferable that subclasses
65  * would override {@link #startOperationAsync(int, OperationOutcome)
66  * startOperationAsync()}. However, if that proves to be too difficult, then they can
67  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}.
68  * <p/>
69  * The futures returned by the methods within this class can be canceled, and will
70  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
71  * returned by overridden methods will do the same. Of course, if a class overrides
72  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
73  * be done to cancel that particular operation.
74  * <p/>
75  * In general tasks in a pipeline are executed by the same thread. However, the following
76  * should always be executed via the executor specified in "params":
77  * <ul>
78  * <li>start callback</li>
79  * <li>completion callback</li>
80  * <li>controller completion (i.e., delayedComplete())</li>
81  * </ul>
82  */
83 public abstract class OperationPartial implements Operation {
84     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
85     private static final Coder coder = new StandardCoder();
86
87     public static final String GUARD_ACTOR_NAME = "GUARD";
88     public static final String GUARD_OPERATION_NAME = "Decision";
89     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
90
91     private final OperatorConfig config;
92
93     /**
94      * Operation parameters.
95      */
96     protected final ControlLoopOperationParams params;
97
98     @Getter
99     private final String fullName;
100
101     @Getter
102     @Setter(AccessLevel.PROTECTED)
103     private String subRequestId;
104
105     @Getter
106     private final List<String> propertyNames;
107
108     /**
109      * Values for the properties identified by {@link #getPropertyNames()}.
110      */
111     private final Map<String, Object> properties = new HashMap<>();
112
113
114     /**
115      * Constructs the object.
116      *
117      * @param params operation parameters
118      * @param config configuration for this operation
119      * @param propertyNames names of properties required by this operation
120      */
121     protected OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
122         this.params = params;
123         this.config = config;
124         this.fullName = params.getActor() + "." + params.getOperation();
125         this.propertyNames = propertyNames;
126     }
127
128     public Executor getBlockingExecutor() {
129         return config.getBlockingExecutor();
130     }
131
132     @Override
133     public String getActorName() {
134         return params.getActor();
135     }
136
137     @Override
138     public String getName() {
139         return params.getOperation();
140     }
141
142     @Override
143     public boolean containsProperty(String name) {
144         return properties.containsKey(name);
145     }
146
147     @Override
148     public void setProperty(String name, Object value) {
149         logger.info("{}: set property {}={}", getFullName(), name, value);
150         properties.put(name, value);
151     }
152
153     @SuppressWarnings("unchecked")
154     @Override
155     public <T> T getProperty(String name) {
156         return (T) properties.get(name);
157     }
158
159     /**
160      * Gets a property value, throwing an exception if it's missing.
161      *
162      * @param name property name
163      * @param propertyType property type, used in an error message if the property value
164      *        is {@code null}
165      * @return the property value
166      */
167     @SuppressWarnings("unchecked")
168     protected <T> T getRequiredProperty(String name, String propertyType) {
169         T value = (T) properties.get(name);
170         if (value == null) {
171             throw new IllegalStateException("missing " + propertyType);
172         }
173
174         return value;
175     }
176
177     @Override
178     public CompletableFuture<OperationOutcome> start() {
179         // allocate a controller for the entire operation
180         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
181
182         // start attempt #1
183         return startOperationAttempt(controller, 1);
184     }
185
186     /**
187      * Starts the operation attempt. When all retries complete, it will complete the
188      * controller.
189      *
190      * @param controller controller for all operation attempts
191      * @param attempt attempt number, typically starting with 1
192      * @return a future that will return the final result of all attempts
193      */
194     private CompletableFuture<OperationOutcome> startOperationAttempt(
195                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
196
197         generateSubRequestId(attempt);
198
199         // propagate "stop" to the operation attempt
200         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
201                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
202
203         return controller;
204     }
205
206     /**
207      * Generates and sets {@link #subRequestId} to a new subrequest ID.
208      *
209      * @param attempt attempt number, typically starting with 1
210      */
211     public void generateSubRequestId(int attempt) {
212         // Note: this should be "protected", but that makes junits much messier
213
214         setSubRequestId(UUID.randomUUID().toString());
215     }
216
217     /**
218      * Starts the operation attempt, without doing any retries.
219      *
220      * @param attempt attempt number, typically starting with 1
221      * @return a future that will return the result of a single operation attempt
222      */
223     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
224
225         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
226
227         final Executor executor = params.getExecutor();
228         final OperationOutcome outcome = makeOutcome();
229         final CallbackManager callbacks = new CallbackManager();
230
231         // this operation attempt gets its own controller
232         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
233
234         // propagate "stop" to the callbacks
235         controller.add(callbacks);
236
237         // @formatter:off
238         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
239                         .whenCompleteAsync(callbackStarted(callbacks), executor)
240                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
241         // @formatter:on
242
243         // handle timeouts, if specified
244         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
245         if (timeoutMillis > 0) {
246             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
247             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
248         }
249
250         /*
251          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
252          * before callbackCompleted() is invoked.
253          *
254          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
255          * the pipeline as the last step anyway.
256          */
257
258         // @formatter:off
259         future.exceptionally(fromException("operation"))
260                     .thenApply(setRetryFlag(attempt))
261                     .whenCompleteAsync(callbackStarted(callbacks), executor)
262                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
263                     .whenCompleteAsync(controller.delayedComplete(), executor);
264         // @formatter:on
265
266         return controller;
267     }
268
269     /**
270      * Determines if the outcome was successful.
271      *
272      * @param outcome outcome to examine
273      * @return {@code true} if the outcome was successful
274      */
275     protected boolean isSuccess(OperationOutcome outcome) {
276         return (outcome != null && outcome.getResult() == OperationResult.SUCCESS);
277     }
278
279     /**
280      * Determines if the outcome was a failure for this operator.
281      *
282      * @param outcome outcome to examine, or {@code null}
283      * @return {@code true} if the outcome is not {@code null} and was a failure
284      *         <i>and</i> was associated with this operator, {@code false} otherwise
285      */
286     protected boolean isActorFailed(OperationOutcome outcome) {
287         return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE);
288     }
289
290     /**
291      * Determines if the given outcome is for this operation.
292      *
293      * @param outcome outcome to examine
294      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
295      */
296     protected boolean isSameOperation(OperationOutcome outcome) {
297         return OperationOutcome.isFor(outcome, getActorName(), getName());
298     }
299
300     /**
301      * Invokes the operation as a "future". This method simply invokes
302      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
303      * returning the result via a "future".
304      * <p/>
305      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
306      * the executor in the "params", as that may bring the background thread pool to a
307      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
308      * instead.
309      * <p/>
310      * This method assumes the following:
311      * <ul>
312      * <li>the operator is alive</li>
313      * <li>verifyRunning() has been invoked</li>
314      * <li>callbackStarted() has been invoked</li>
315      * <li>the invoker will perform appropriate timeout checks</li>
316      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
317      * </ul>
318      *
319      * @param attempt attempt number, typically starting with 1
320      * @return a function that will start the operation and return its result when
321      *         complete
322      */
323     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
324
325         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
326     }
327
328     /**
329      * Low-level method that performs the operation. This can make the same assumptions
330      * that are made by {@link #doOperationAsFuture()}. This particular method simply
331      * throws an {@link UnsupportedOperationException}.
332      *
333      * @param attempt attempt number, typically starting with 1
334      * @param operation the operation being performed
335      * @return the outcome of the operation
336      */
337     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
338
339         throw new UnsupportedOperationException("start operation " + getFullName());
340     }
341
342     /**
343      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
344      * FAILURE, assuming the policy specifies retries and the retry count has been
345      * exhausted.
346      *
347      * @param attempt latest attempt number, starting with 1
348      * @return a function to get the next future to execute
349      */
350     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
351
352         return origOutcome -> {
353             // ensure we have a non-null outcome
354             OperationOutcome outcome;
355             if (origOutcome != null) {
356                 outcome = origOutcome;
357             } else {
358                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
359                 outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE);
360             }
361
362             // ensure correct actor/operation
363             outcome.setActor(getActorName());
364             outcome.setOperation(getName());
365
366             // determine if we should retry, based on the result
367             if (outcome.getResult() != OperationResult.FAILURE) {
368                 // do not retry success or other failure types (e.g., exception)
369                 outcome.setFinalOutcome(true);
370                 return outcome;
371             }
372
373             int retry = getRetry(params.getRetry());
374             if (retry <= 0) {
375                 // no retries were specified
376                 outcome.setFinalOutcome(true);
377
378             } else if (attempt <= retry) {
379                 // have more retries - not the final outcome
380                 outcome.setFinalOutcome(false);
381
382             } else {
383                 /*
384                  * retries were specified and we've already tried them all - change to
385                  * FAILURE_RETRIES
386                  */
387                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
388                 outcome.setResult(OperationResult.FAILURE_RETRIES);
389                 outcome.setFinalOutcome(true);
390             }
391
392             return outcome;
393         };
394     }
395
396     /**
397      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
398      * was previously invoked, and thus that the "operation" is not {@code null}.
399      *
400      * @param controller controller for all of the retries
401      * @param attempt latest attempt number, starting with 1
402      * @return a function to get the next future to execute
403      */
404     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
405                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
406
407         return operation -> {
408             if (!isActorFailed(operation)) {
409                 // wrong type or wrong operation - just leave it as is
410                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
411                 controller.complete(operation);
412                 return new CompletableFuture<>();
413             }
414
415             if (getRetry(params.getRetry()) <= 0) {
416                 // no retries - already marked as FAILURE, so just return it
417                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
418                 controller.complete(operation);
419                 return new CompletableFuture<>();
420             }
421
422             /*
423              * Retry the operation.
424              */
425             long waitMs = getRetryWaitMs();
426             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
427
428             return sleep(waitMs, TimeUnit.MILLISECONDS)
429                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
430         };
431     }
432
433     /**
434      * Convenience method that starts a sleep(), running via a future.
435      *
436      * @param sleepTime time to sleep
437      * @param unit time unit
438      * @return a future that will complete when the sleep completes
439      */
440     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
441         if (sleepTime <= 0) {
442             return CompletableFuture.completedFuture(null);
443         }
444
445         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
446     }
447
448     /**
449      * Converts an exception into an operation outcome, returning a copy of the outcome to
450      * prevent background jobs from changing it.
451      *
452      * @param type type of item throwing the exception
453      * @return a function that will convert an exception into an operation outcome
454      */
455     private Function<Throwable, OperationOutcome> fromException(String type) {
456
457         return thrown -> {
458             OperationOutcome outcome = makeOutcome();
459
460             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
461                 // do not include exception in the message, as it just clutters the log
462                 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
463                                 params.getRequestId());
464             } else {
465                 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
466                                 params.getRequestId(), thrown);
467             }
468
469             return setOutcome(outcome, thrown);
470         };
471     }
472
473     /**
474      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
475      * any outstanding futures when one completes.
476      *
477      * @param futureMakers function to make a future. If the function returns
478      *        {@code null}, then no future is created for that function. On the other
479      *        hand, if the function throws an exception, then the previously created
480      *        functions are canceled and the exception is re-thrown
481      * @return a future to cancel or await an outcome, or {@code null} if no futures were
482      *         created. If this future is canceled, then all of the futures will be
483      *         canceled
484      */
485     public CompletableFuture<OperationOutcome> anyOf(
486                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
487
488         return anyOf(Arrays.asList(futureMakers));
489     }
490
491     /**
492      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
493      * any outstanding futures when one completes.
494      *
495      * @param futureMakers function to make a future. If the function returns
496      *        {@code null}, then no future is created for that function. On the other
497      *        hand, if the function throws an exception, then the previously created
498      *        functions are canceled and the exception is re-thrown
499      * @return a future to cancel or await an outcome, or {@code null} if no futures were
500      *         created. If this future is canceled, then all of the futures will be
501      *         canceled. Similarly, when this future completes, any incomplete futures
502      *         will be canceled
503      */
504     public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
505
506         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
507
508         CompletableFuture<OperationOutcome>[] futures =
509                         attachFutures(controller, futureMakers, UnaryOperator.identity());
510
511         if (futures.length == 0) {
512             // no futures were started
513             return null;
514         }
515
516         if (futures.length == 1) {
517             return futures[0];
518         }
519
520         CompletableFuture.anyOf(futures).thenApply(OperationOutcome.class::cast)
521                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
522
523         return controller;
524     }
525
526     /**
527      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
528      *
529      * @param futureMakers function to make a future. If the function returns
530      *        {@code null}, then no future is created for that function. On the other
531      *        hand, if the function throws an exception, then the previously created
532      *        functions are canceled and the exception is re-thrown
533      * @return a future to cancel or await an outcome, or {@code null} if no futures were
534      *         created. If this future is canceled, then all of the futures will be
535      *         canceled
536      */
537     public CompletableFuture<OperationOutcome> allOf(
538                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
539
540         return allOf(Arrays.asList(futureMakers));
541     }
542
543     /**
544      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
545      *
546      * @param futureMakers function to make a future. If the function returns
547      *        {@code null}, then no future is created for that function. On the other
548      *        hand, if the function throws an exception, then the previously created
549      *        functions are canceled and the exception is re-thrown
550      * @return a future to cancel or await an outcome, or {@code null} if no futures were
551      *         created. If this future is canceled, then all of the futures will be
552      *         canceled. Similarly, when this future completes, any incomplete futures
553      *         will be canceled
554      */
555     public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
556         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
557
558         Queue<OperationOutcome> outcomes = new LinkedList<>();
559
560         CompletableFuture<OperationOutcome>[] futures =
561                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
562                             synchronized (outcomes) {
563                                 outcomes.add(outcome);
564                             }
565                             return outcome;
566                         }));
567
568         if (futures.length == 0) {
569             // no futures were started
570             return null;
571         }
572
573         if (futures.length == 1) {
574             return futures[0];
575         }
576
577         // @formatter:off
578         CompletableFuture.allOf(futures)
579                         .thenApply(unused -> combineOutcomes(outcomes))
580                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
581         // @formatter:on
582
583         return controller;
584     }
585
586     /**
587      * Invokes the functions to create the futures and attaches them to the controller.
588      *
589      * @param controller master controller for all of the futures
590      * @param futureMakers futures to be attached to the controller
591      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
592      *        Returns the adorned future
593      * @return an array of futures, possibly zero-length. If the array is of size one,
594      *         then that one item should be returned instead of the controller
595      */
596     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
597                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
598                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
599
600         if (futureMakers.isEmpty()) {
601             @SuppressWarnings("unchecked")
602             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
603             return result;
604         }
605
606         // the last, unadorned future that is created
607         CompletableFuture<OperationOutcome> lastFuture = null;
608
609         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
610
611         // make each future
612         for (var maker : futureMakers) {
613             try {
614                 CompletableFuture<OperationOutcome> future = maker.get();
615                 if (future == null) {
616                     continue;
617                 }
618
619                 // propagate "stop" to the future
620                 controller.add(future);
621
622                 futures.add(adorn.apply(future));
623
624                 lastFuture = future;
625
626             } catch (RuntimeException e) {
627                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
628                 controller.cancel(false);
629                 throw e;
630             }
631         }
632
633         @SuppressWarnings("unchecked")
634         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
635
636         if (result.length == 1) {
637             // special case - return the unadorned future
638             result[0] = lastFuture;
639             return result;
640         }
641
642         return futures.toArray(result);
643     }
644
645     /**
646      * Combines the outcomes from a set of tasks.
647      *
648      * @param outcomes outcomes to be examined
649      * @return the combined outcome
650      */
651     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
652
653         // identify the outcome with the highest priority
654         OperationOutcome outcome = outcomes.remove();
655         int priority = detmPriority(outcome);
656
657         for (OperationOutcome outcome2 : outcomes) {
658             int priority2 = detmPriority(outcome2);
659
660             if (priority2 > priority) {
661                 outcome = outcome2;
662                 priority = priority2;
663             }
664         }
665
666         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
667                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
668
669         return outcome;
670     }
671
672     /**
673      * Determines the priority of an outcome based on its result.
674      *
675      * @param outcome outcome to examine, or {@code null}
676      * @return the outcome's priority
677      */
678     protected int detmPriority(OperationOutcome outcome) {
679         if (outcome == null || outcome.getResult() == null) {
680             return 1;
681         }
682
683         switch (outcome.getResult()) {
684             case SUCCESS:
685                 return 0;
686
687             case FAILURE_GUARD:
688                 return 2;
689
690             case FAILURE_RETRIES:
691                 return 3;
692
693             case FAILURE:
694                 return 4;
695
696             case FAILURE_TIMEOUT:
697                 return 5;
698
699             case FAILURE_EXCEPTION:
700             default:
701                 return 6;
702         }
703     }
704
705     /**
706      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
707      * not created until the previous task completes. The pipeline returns the outcome of
708      * the last task executed.
709      *
710      * @param futureMakers functions to make the futures
711      * @return a future to cancel the sequence or await the outcome
712      */
713     public CompletableFuture<OperationOutcome> sequence(
714                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
715
716         return sequence(Arrays.asList(futureMakers));
717     }
718
719     /**
720      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
721      * not created until the previous task completes. The pipeline returns the outcome of
722      * the last task executed.
723      *
724      * @param futureMakers functions to make the futures
725      * @return a future to cancel the sequence or await the outcome, or {@code null} if
726      *         there were no tasks to perform
727      */
728     public CompletableFuture<OperationOutcome> sequence(
729                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
730
731         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
732
733         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
734         if (nextTask == null) {
735             // no tasks
736             return null;
737         }
738
739         if (queue.isEmpty()) {
740             // only one task - just return it rather than wrapping it in a controller
741             return nextTask;
742         }
743
744         /*
745          * multiple tasks - need a controller to stop whichever task is currently
746          * executing
747          */
748         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
749         final Executor executor = params.getExecutor();
750
751         // @formatter:off
752         controller.wrap(nextTask)
753                     .thenCompose(nextTaskOnSuccess(controller, queue))
754                     .whenCompleteAsync(controller.delayedComplete(), executor);
755         // @formatter:on
756
757         return controller;
758     }
759
760     /**
761      * Executes the next task in the queue, if the previous outcome was successful.
762      *
763      * @param controller pipeline controller
764      * @param taskQueue queue of tasks to be performed
765      * @return a future to execute the remaining tasks, or the current outcome, if it's a
766      *         failure, or if there are no more tasks
767      */
768     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
769                     PipelineControllerFuture<OperationOutcome> controller,
770                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
771
772         return outcome -> {
773             if (!isSuccess(outcome)) {
774                 // return the failure
775                 return CompletableFuture.completedFuture(outcome);
776             }
777
778             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
779             if (nextTask == null) {
780                 // no tasks - just return the success
781                 return CompletableFuture.completedFuture(outcome);
782             }
783
784             // @formatter:off
785             return controller
786                         .wrap(nextTask)
787                         .thenCompose(nextTaskOnSuccess(controller, taskQueue));
788             // @formatter:on
789         };
790     }
791
792     /**
793      * Gets the next task from the queue, skipping those that are {@code null}.
794      *
795      * @param taskQueue task queue
796      * @return the next task, or {@code null} if the queue is now empty
797      */
798     private CompletableFuture<OperationOutcome> getNextTask(
799                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
800
801         Supplier<CompletableFuture<OperationOutcome>> maker;
802
803         while ((maker = taskQueue.poll()) != null) {
804             CompletableFuture<OperationOutcome> future = maker.get();
805             if (future != null) {
806                 return future;
807             }
808         }
809
810         return null;
811     }
812
813     /**
814      * Sets the start time of the operation and invokes the callback to indicate that the
815      * operation has started. Does nothing if the pipeline has been stopped.
816      * <p/>
817      * This assumes that the "outcome" is not {@code null}.
818      *
819      * @param callbacks used to determine if the start callback can be invoked
820      * @return a function that sets the start time and invokes the callback
821      */
822     protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
823
824         return (outcome, thrown) -> {
825
826             if (callbacks.canStart()) {
827                 outcome.setSubRequestId(getSubRequestId());
828                 outcome.setStart(callbacks.getStartTime());
829                 outcome.setEnd(null);
830
831                 // pass a copy to the callback
832                 OperationOutcome outcome2 = new OperationOutcome(outcome);
833                 outcome2.setFinalOutcome(false);
834                 params.callbackStarted(outcome2);
835             }
836         };
837     }
838
839     /**
840      * Sets the end time of the operation and invokes the callback to indicate that the
841      * operation has completed. Does nothing if the pipeline has been stopped.
842      * <p/>
843      * This assumes that the "outcome" is not {@code null}.
844      * <p/>
845      * Note: the start time must be a reference rather than a plain value, because it's
846      * value must be gotten on-demand, when the returned function is executed at a later
847      * time.
848      *
849      * @param callbacks used to determine if the end callback can be invoked
850      * @return a function that sets the end time and invokes the callback
851      */
852     protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
853
854         return (outcome, thrown) -> {
855             if (callbacks.canEnd()) {
856                 outcome.setSubRequestId(getSubRequestId());
857                 outcome.setStart(callbacks.getStartTime());
858                 outcome.setEnd(callbacks.getEndTime());
859
860                 // pass a copy to the callback
861                 params.callbackCompleted(new OperationOutcome(outcome));
862             }
863         };
864     }
865
866     /**
867      * Sets an operation's outcome and message, based on a throwable.
868      *
869      * @param operation operation to be updated
870      * @return the updated operation
871      */
872     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
873         OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT
874                 : OperationResult.FAILURE_EXCEPTION);
875         return setOutcome(operation, result);
876     }
877
878     /**
879      * Sets an operation's outcome and default message based on the result.
880      *
881      * @param operation operation to be updated
882      * @param result result of the operation
883      * @return the updated operation
884      */
885     public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) {
886         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
887         operation.setResult(result);
888         operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
889                         : ControlLoopOperation.FAILED_MSG);
890
891         return operation;
892     }
893
894     /**
895      * Makes an outcome, populating the "target" field with the contents of the target
896      * entity property.
897      *
898      * @return a new operation outcome
899      */
900     protected OperationOutcome makeOutcome() {
901         OperationOutcome outcome = params.makeOutcome();
902         outcome.setTarget(getProperty(OperationProperties.AAI_TARGET_ENTITY));
903         return outcome;
904     }
905
906     /**
907      * Determines if a throwable is due to a timeout.
908      *
909      * @param thrown throwable of interest
910      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
911      */
912     protected boolean isTimeout(Throwable thrown) {
913         if (thrown instanceof CompletionException) {
914             thrown = thrown.getCause();
915         }
916
917         return (thrown instanceof TimeoutException);
918     }
919
920     /**
921      * Logs a message. If the message is not of type, String, then it attempts to
922      * pretty-print it into JSON before logging.
923      *
924      * @param direction IN or OUT
925      * @param infra communication infrastructure on which it was published
926      * @param source source name (e.g., the URL or Topic name)
927      * @param message message to be logged
928      * @return the JSON text that was logged
929      */
930     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
931         String json;
932         try {
933             json = prettyPrint(message);
934
935         } catch (IllegalArgumentException e) {
936             String type = (direction == EventType.IN ? "response" : "request");
937             logger.warn("cannot pretty-print {}", type, e);
938             json = message.toString();
939         }
940
941         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
942
943         return json;
944     }
945
946     /**
947      * Converts a message to a "pretty-printed" String using the operation's normal
948      * serialization provider (i.e., it's <i>coder</i>).
949      *
950      * @param message response to be logged
951      * @return the JSON text that was logged
952      * @throws IllegalArgumentException if the message cannot be converted
953      */
954     public <T> String prettyPrint(T message) {
955         if (message == null) {
956             return null;
957         } else if (message instanceof String) {
958             return message.toString();
959         } else {
960             try {
961                 return getCoder().encode(message, true);
962             } catch (CoderException e) {
963                 throw new IllegalArgumentException("cannot encode message", e);
964             }
965         }
966     }
967
968     // these may be overridden by subclasses or junit tests
969
970     /**
971      * Gets the retry count.
972      *
973      * @param retry retry, extracted from the parameters, or {@code null}
974      * @return the number of retries, or {@code 0} if no retries were specified
975      */
976     protected int getRetry(Integer retry) {
977         return (retry == null ? 0 : retry);
978     }
979
980     /**
981      * Gets the retry wait, in milliseconds.
982      *
983      * @return the retry wait, in milliseconds
984      */
985     protected long getRetryWaitMs() {
986         return DEFAULT_RETRY_WAIT_MS;
987     }
988
989     /**
990      * Gets the operation timeout.
991      *
992      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
993      *        {@code null}
994      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
995      *         specified
996      */
997     protected long getTimeoutMs(Integer timeoutSec) {
998         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
999     }
1000
1001     // these may be overridden by junit tests
1002
1003     protected Coder getCoder() {
1004         return coder;
1005     }
1006 }