More sonars in models
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.UUID;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.CompletionException;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.function.BiConsumer;
39 import java.util.function.Function;
40 import java.util.function.Supplier;
41 import java.util.function.UnaryOperator;
42 import lombok.AccessLevel;
43 import lombok.Getter;
44 import lombok.Setter;
45 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
46 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
48 import org.onap.policy.common.utils.coder.Coder;
49 import org.onap.policy.common.utils.coder.CoderException;
50 import org.onap.policy.common.utils.coder.StandardCoder;
51 import org.onap.policy.controlloop.ControlLoopOperation;
52 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
53 import org.onap.policy.controlloop.actorserviceprovider.Operation;
54 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
59 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * Partial implementation of an operator. In general, it's preferable that subclasses
65  * would override {@link #startOperationAsync(int, OperationOutcome)
66  * startOperationAsync()}. However, if that proves to be too difficult, then they can
67  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}.
68  * <p/>
69  * The futures returned by the methods within this class can be canceled, and will
70  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
71  * returned by overridden methods will do the same. Of course, if a class overrides
72  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
73  * be done to cancel that particular operation.
74  * <p/>
75  * In general tasks in a pipeline are executed by the same thread. However, the following
76  * should always be executed via the executor specified in "params":
77  * <ul>
78  * <li>start callback</li>
79  * <li>completion callback</li>
80  * <li>controller completion (i.e., delayedComplete())</li>
81  * </ul>
82  */
83 public abstract class OperationPartial implements Operation {
84     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
85     private static final Coder coder = new StandardCoder();
86
87     public static final String GUARD_ACTOR_NAME = "GUARD";
88     public static final String GUARD_OPERATION_NAME = "Decision";
89     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
90
91     private final OperatorConfig config;
92
93     /**
94      * Operation parameters.
95      */
96     protected final ControlLoopOperationParams params;
97
98     @Getter
99     private final String fullName;
100
101     @Getter
102     @Setter(AccessLevel.PROTECTED)
103     private String subRequestId;
104
105     @Getter
106     private final List<String> propertyNames;
107
108     /**
109      * Values for the properties identified by {@link #getPropertyNames()}.
110      */
111     private final Map<String, Object> properties = new HashMap<>();
112
113
114     /**
115      * Constructs the object.
116      *
117      * @param params operation parameters
118      * @param config configuration for this operation
119      * @param propertyNames names of properties required by this operation
120      */
121     protected OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
122         this.params = params;
123         this.config = config;
124         this.fullName = params.getActor() + "." + params.getOperation();
125         this.propertyNames = propertyNames;
126     }
127
128     public Executor getBlockingExecutor() {
129         return config.getBlockingExecutor();
130     }
131
132     @Override
133     public String getActorName() {
134         return params.getActor();
135     }
136
137     @Override
138     public String getName() {
139         return params.getOperation();
140     }
141
142     @Override
143     public boolean containsProperty(String name) {
144         return properties.containsKey(name);
145     }
146
147     @Override
148     public void setProperty(String name, Object value) {
149         logger.info("{}: set property {}={}", getFullName(), name, value);
150         properties.put(name, value);
151     }
152
153     @SuppressWarnings("unchecked")
154     @Override
155     public <T> T getProperty(String name) {
156         return (T) properties.get(name);
157     }
158
159     /**
160      * Gets a property value, throwing an exception if it's missing.
161      *
162      * @param name property name
163      * @param propertyType property type, used in an error message if the property value
164      *        is {@code null}
165      * @return the property value
166      */
167     @SuppressWarnings("unchecked")
168     protected <T> T getRequiredProperty(String name, String propertyType) {
169         T value = (T) properties.get(name);
170         if (value == null) {
171             throw new IllegalStateException("missing " + propertyType);
172         }
173
174         return value;
175     }
176
177     @Override
178     public CompletableFuture<OperationOutcome> start() {
179         // allocate a controller for the entire operation
180         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
181
182         // start attempt #1
183         return startOperationAttempt(controller, 1);
184     }
185
186     /**
187      * Starts the operation attempt. When all retries complete, it will complete the
188      * controller.
189      *
190      * @param controller controller for all operation attempts
191      * @param attempt attempt number, typically starting with 1
192      * @return a future that will return the final result of all attempts
193      */
194     private CompletableFuture<OperationOutcome> startOperationAttempt(
195                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
196
197         generateSubRequestId(attempt);
198
199         // propagate "stop" to the operation attempt
200         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
201                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
202
203         return controller;
204     }
205
206     /**
207      * Generates and sets {@link #subRequestId} to a new subrequest ID.
208      *
209      * @param attempt attempt number, typically starting with 1
210      */
211     public void generateSubRequestId(int attempt) {
212         // Note: this should be "protected", but that makes junits much messier
213
214         setSubRequestId(UUID.randomUUID().toString());
215     }
216
217     /**
218      * Starts the operation attempt, without doing any retries.
219      *
220      * @param params operation parameters
221      * @param attempt attempt number, typically starting with 1
222      * @return a future that will return the result of a single operation attempt
223      */
224     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
225
226         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
227
228         final Executor executor = params.getExecutor();
229         final OperationOutcome outcome = makeOutcome();
230         final CallbackManager callbacks = new CallbackManager();
231
232         // this operation attempt gets its own controller
233         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
234
235         // propagate "stop" to the callbacks
236         controller.add(callbacks);
237
238         // @formatter:off
239         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
240                         .whenCompleteAsync(callbackStarted(callbacks), executor)
241                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
242         // @formatter:on
243
244         // handle timeouts, if specified
245         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
246         if (timeoutMillis > 0) {
247             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
248             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
249         }
250
251         /*
252          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
253          * before callbackCompleted() is invoked.
254          *
255          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
256          * the pipeline as the last step anyway.
257          */
258
259         // @formatter:off
260         future.exceptionally(fromException("operation"))
261                     .thenApply(setRetryFlag(attempt))
262                     .whenCompleteAsync(callbackStarted(callbacks), executor)
263                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
264                     .whenCompleteAsync(controller.delayedComplete(), executor);
265         // @formatter:on
266
267         return controller;
268     }
269
270     /**
271      * Determines if the outcome was successful.
272      *
273      * @param outcome outcome to examine
274      * @return {@code true} if the outcome was successful
275      */
276     protected boolean isSuccess(OperationOutcome outcome) {
277         return (outcome != null && outcome.getResult() == OperationResult.SUCCESS);
278     }
279
280     /**
281      * Determines if the outcome was a failure for this operator.
282      *
283      * @param outcome outcome to examine, or {@code null}
284      * @return {@code true} if the outcome is not {@code null} and was a failure
285      *         <i>and</i> was associated with this operator, {@code false} otherwise
286      */
287     protected boolean isActorFailed(OperationOutcome outcome) {
288         return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE);
289     }
290
291     /**
292      * Determines if the given outcome is for this operation.
293      *
294      * @param outcome outcome to examine
295      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
296      */
297     protected boolean isSameOperation(OperationOutcome outcome) {
298         return OperationOutcome.isFor(outcome, getActorName(), getName());
299     }
300
301     /**
302      * Invokes the operation as a "future". This method simply invokes
303      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
304      * returning the result via a "future".
305      * <p/>
306      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
307      * the executor in the "params", as that may bring the background thread pool to a
308      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
309      * instead.
310      * <p/>
311      * This method assumes the following:
312      * <ul>
313      * <li>the operator is alive</li>
314      * <li>verifyRunning() has been invoked</li>
315      * <li>callbackStarted() has been invoked</li>
316      * <li>the invoker will perform appropriate timeout checks</li>
317      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
318      * </ul>
319      *
320      * @param attempt attempt number, typically starting with 1
321      * @return a function that will start the operation and return its result when
322      *         complete
323      */
324     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
325
326         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
327     }
328
329     /**
330      * Low-level method that performs the operation. This can make the same assumptions
331      * that are made by {@link #doOperationAsFuture()}. This particular method simply
332      * throws an {@link UnsupportedOperationException}.
333      *
334      * @param attempt attempt number, typically starting with 1
335      * @param operation the operation being performed
336      * @return the outcome of the operation
337      */
338     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
339
340         throw new UnsupportedOperationException("start operation " + getFullName());
341     }
342
343     /**
344      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
345      * FAILURE, assuming the policy specifies retries and the retry count has been
346      * exhausted.
347      *
348      * @param attempt latest attempt number, starting with 1
349      * @return a function to get the next future to execute
350      */
351     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
352
353         return origOutcome -> {
354             // ensure we have a non-null outcome
355             OperationOutcome outcome;
356             if (origOutcome != null) {
357                 outcome = origOutcome;
358             } else {
359                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
360                 outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE);
361             }
362
363             // ensure correct actor/operation
364             outcome.setActor(getActorName());
365             outcome.setOperation(getName());
366
367             // determine if we should retry, based on the result
368             if (outcome.getResult() != OperationResult.FAILURE) {
369                 // do not retry success or other failure types (e.g., exception)
370                 outcome.setFinalOutcome(true);
371                 return outcome;
372             }
373
374             int retry = getRetry(params.getRetry());
375             if (retry <= 0) {
376                 // no retries were specified
377                 outcome.setFinalOutcome(true);
378
379             } else if (attempt <= retry) {
380                 // have more retries - not the final outcome
381                 outcome.setFinalOutcome(false);
382
383             } else {
384                 /*
385                  * retries were specified and we've already tried them all - change to
386                  * FAILURE_RETRIES
387                  */
388                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
389                 outcome.setResult(OperationResult.FAILURE_RETRIES);
390                 outcome.setFinalOutcome(true);
391             }
392
393             return outcome;
394         };
395     }
396
397     /**
398      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
399      * was previously invoked, and thus that the "operation" is not {@code null}.
400      *
401      * @param controller controller for all of the retries
402      * @param attempt latest attempt number, starting with 1
403      * @return a function to get the next future to execute
404      */
405     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
406                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
407
408         return operation -> {
409             if (!isActorFailed(operation)) {
410                 // wrong type or wrong operation - just leave it as is
411                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
412                 controller.complete(operation);
413                 return new CompletableFuture<>();
414             }
415
416             if (getRetry(params.getRetry()) <= 0) {
417                 // no retries - already marked as FAILURE, so just return it
418                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
419                 controller.complete(operation);
420                 return new CompletableFuture<>();
421             }
422
423             /*
424              * Retry the operation.
425              */
426             long waitMs = getRetryWaitMs();
427             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
428
429             return sleep(waitMs, TimeUnit.MILLISECONDS)
430                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
431         };
432     }
433
434     /**
435      * Convenience method that starts a sleep(), running via a future.
436      *
437      * @param sleepTime time to sleep
438      * @param unit time unit
439      * @return a future that will complete when the sleep completes
440      */
441     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
442         if (sleepTime <= 0) {
443             return CompletableFuture.completedFuture(null);
444         }
445
446         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
447     }
448
449     /**
450      * Converts an exception into an operation outcome, returning a copy of the outcome to
451      * prevent background jobs from changing it.
452      *
453      * @param type type of item throwing the exception
454      * @return a function that will convert an exception into an operation outcome
455      */
456     private Function<Throwable, OperationOutcome> fromException(String type) {
457
458         return thrown -> {
459             OperationOutcome outcome = makeOutcome();
460
461             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
462                 // do not include exception in the message, as it just clutters the log
463                 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
464                                 params.getRequestId());
465             } else {
466                 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
467                                 params.getRequestId(), thrown);
468             }
469
470             return setOutcome(outcome, thrown);
471         };
472     }
473
474     /**
475      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
476      * any outstanding futures when one completes.
477      *
478      * @param futureMakers function to make a future. If the function returns
479      *        {@code null}, then no future is created for that function. On the other
480      *        hand, if the function throws an exception, then the previously created
481      *        functions are canceled and the exception is re-thrown
482      * @return a future to cancel or await an outcome, or {@code null} if no futures were
483      *         created. If this future is canceled, then all of the futures will be
484      *         canceled
485      */
486     public CompletableFuture<OperationOutcome> anyOf(
487                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
488
489         return anyOf(Arrays.asList(futureMakers));
490     }
491
492     /**
493      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
494      * any outstanding futures when one completes.
495      *
496      * @param futureMakers function to make a future. If the function returns
497      *        {@code null}, then no future is created for that function. On the other
498      *        hand, if the function throws an exception, then the previously created
499      *        functions are canceled and the exception is re-thrown
500      * @return a future to cancel or await an outcome, or {@code null} if no futures were
501      *         created. If this future is canceled, then all of the futures will be
502      *         canceled. Similarly, when this future completes, any incomplete futures
503      *         will be canceled
504      */
505     public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
506
507         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
508
509         CompletableFuture<OperationOutcome>[] futures =
510                         attachFutures(controller, futureMakers, UnaryOperator.identity());
511
512         if (futures.length == 0) {
513             // no futures were started
514             return null;
515         }
516
517         if (futures.length == 1) {
518             return futures[0];
519         }
520
521         CompletableFuture.anyOf(futures).thenApply(OperationOutcome.class::cast)
522                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
523
524         return controller;
525     }
526
527     /**
528      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
529      *
530      * @param futureMakers function to make a future. If the function returns
531      *        {@code null}, then no future is created for that function. On the other
532      *        hand, if the function throws an exception, then the previously created
533      *        functions are canceled and the exception is re-thrown
534      * @return a future to cancel or await an outcome, or {@code null} if no futures were
535      *         created. If this future is canceled, then all of the futures will be
536      *         canceled
537      */
538     public CompletableFuture<OperationOutcome> allOf(
539                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
540
541         return allOf(Arrays.asList(futureMakers));
542     }
543
544     /**
545      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
546      *
547      * @param futureMakers function to make a future. If the function returns
548      *        {@code null}, then no future is created for that function. On the other
549      *        hand, if the function throws an exception, then the previously created
550      *        functions are canceled and the exception is re-thrown
551      * @return a future to cancel or await an outcome, or {@code null} if no futures were
552      *         created. If this future is canceled, then all of the futures will be
553      *         canceled. Similarly, when this future completes, any incomplete futures
554      *         will be canceled
555      */
556     public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
557         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
558
559         Queue<OperationOutcome> outcomes = new LinkedList<>();
560
561         CompletableFuture<OperationOutcome>[] futures =
562                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
563                             synchronized (outcomes) {
564                                 outcomes.add(outcome);
565                             }
566                             return outcome;
567                         }));
568
569         if (futures.length == 0) {
570             // no futures were started
571             return null;
572         }
573
574         if (futures.length == 1) {
575             return futures[0];
576         }
577
578         // @formatter:off
579         CompletableFuture.allOf(futures)
580                         .thenApply(unused -> combineOutcomes(outcomes))
581                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
582         // @formatter:on
583
584         return controller;
585     }
586
587     /**
588      * Invokes the functions to create the futures and attaches them to the controller.
589      *
590      * @param controller master controller for all of the futures
591      * @param futureMakers futures to be attached to the controller
592      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
593      *        Returns the adorned future
594      * @return an array of futures, possibly zero-length. If the array is of size one,
595      *         then that one item should be returned instead of the controller
596      */
597     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
598                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
599                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
600
601         if (futureMakers.isEmpty()) {
602             @SuppressWarnings("unchecked")
603             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
604             return result;
605         }
606
607         // the last, unadorned future that is created
608         CompletableFuture<OperationOutcome> lastFuture = null;
609
610         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
611
612         // make each future
613         for (var maker : futureMakers) {
614             try {
615                 CompletableFuture<OperationOutcome> future = maker.get();
616                 if (future == null) {
617                     continue;
618                 }
619
620                 // propagate "stop" to the future
621                 controller.add(future);
622
623                 futures.add(adorn.apply(future));
624
625                 lastFuture = future;
626
627             } catch (RuntimeException e) {
628                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
629                 controller.cancel(false);
630                 throw e;
631             }
632         }
633
634         @SuppressWarnings("unchecked")
635         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
636
637         if (result.length == 1) {
638             // special case - return the unadorned future
639             result[0] = lastFuture;
640             return result;
641         }
642
643         return futures.toArray(result);
644     }
645
646     /**
647      * Combines the outcomes from a set of tasks.
648      *
649      * @param outcomes outcomes to be examined
650      * @return the combined outcome
651      */
652     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
653
654         // identify the outcome with the highest priority
655         OperationOutcome outcome = outcomes.remove();
656         int priority = detmPriority(outcome);
657
658         for (OperationOutcome outcome2 : outcomes) {
659             int priority2 = detmPriority(outcome2);
660
661             if (priority2 > priority) {
662                 outcome = outcome2;
663                 priority = priority2;
664             }
665         }
666
667         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
668                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
669
670         return outcome;
671     }
672
673     /**
674      * Determines the priority of an outcome based on its result.
675      *
676      * @param outcome outcome to examine, or {@code null}
677      * @return the outcome's priority
678      */
679     protected int detmPriority(OperationOutcome outcome) {
680         if (outcome == null || outcome.getResult() == null) {
681             return 1;
682         }
683
684         switch (outcome.getResult()) {
685             case SUCCESS:
686                 return 0;
687
688             case FAILURE_GUARD:
689                 return 2;
690
691             case FAILURE_RETRIES:
692                 return 3;
693
694             case FAILURE:
695                 return 4;
696
697             case FAILURE_TIMEOUT:
698                 return 5;
699
700             case FAILURE_EXCEPTION:
701             default:
702                 return 6;
703         }
704     }
705
706     /**
707      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
708      * not created until the previous task completes. The pipeline returns the outcome of
709      * the last task executed.
710      *
711      * @param futureMakers functions to make the futures
712      * @return a future to cancel the sequence or await the outcome
713      */
714     public CompletableFuture<OperationOutcome> sequence(
715                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
716
717         return sequence(Arrays.asList(futureMakers));
718     }
719
720     /**
721      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
722      * not created until the previous task completes. The pipeline returns the outcome of
723      * the last task executed.
724      *
725      * @param futureMakers functions to make the futures
726      * @return a future to cancel the sequence or await the outcome, or {@code null} if
727      *         there were no tasks to perform
728      */
729     public CompletableFuture<OperationOutcome> sequence(
730                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
731
732         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
733
734         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
735         if (nextTask == null) {
736             // no tasks
737             return null;
738         }
739
740         if (queue.isEmpty()) {
741             // only one task - just return it rather than wrapping it in a controller
742             return nextTask;
743         }
744
745         /*
746          * multiple tasks - need a controller to stop whichever task is currently
747          * executing
748          */
749         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
750         final Executor executor = params.getExecutor();
751
752         // @formatter:off
753         controller.wrap(nextTask)
754                     .thenCompose(nextTaskOnSuccess(controller, queue))
755                     .whenCompleteAsync(controller.delayedComplete(), executor);
756         // @formatter:on
757
758         return controller;
759     }
760
761     /**
762      * Executes the next task in the queue, if the previous outcome was successful.
763      *
764      * @param controller pipeline controller
765      * @param taskQueue queue of tasks to be performed
766      * @return a future to execute the remaining tasks, or the current outcome, if it's a
767      *         failure, or if there are no more tasks
768      */
769     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
770                     PipelineControllerFuture<OperationOutcome> controller,
771                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
772
773         return outcome -> {
774             if (!isSuccess(outcome)) {
775                 // return the failure
776                 return CompletableFuture.completedFuture(outcome);
777             }
778
779             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
780             if (nextTask == null) {
781                 // no tasks - just return the success
782                 return CompletableFuture.completedFuture(outcome);
783             }
784
785             // @formatter:off
786             return controller
787                         .wrap(nextTask)
788                         .thenCompose(nextTaskOnSuccess(controller, taskQueue));
789             // @formatter:on
790         };
791     }
792
793     /**
794      * Gets the next task from the queue, skipping those that are {@code null}.
795      *
796      * @param taskQueue task queue
797      * @return the next task, or {@code null} if the queue is now empty
798      */
799     private CompletableFuture<OperationOutcome> getNextTask(
800                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
801
802         Supplier<CompletableFuture<OperationOutcome>> maker;
803
804         while ((maker = taskQueue.poll()) != null) {
805             CompletableFuture<OperationOutcome> future = maker.get();
806             if (future != null) {
807                 return future;
808             }
809         }
810
811         return null;
812     }
813
814     /**
815      * Sets the start time of the operation and invokes the callback to indicate that the
816      * operation has started. Does nothing if the pipeline has been stopped.
817      * <p/>
818      * This assumes that the "outcome" is not {@code null}.
819      *
820      * @param callbacks used to determine if the start callback can be invoked
821      * @return a function that sets the start time and invokes the callback
822      */
823     protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
824
825         return (outcome, thrown) -> {
826
827             if (callbacks.canStart()) {
828                 outcome.setSubRequestId(getSubRequestId());
829                 outcome.setStart(callbacks.getStartTime());
830                 outcome.setEnd(null);
831
832                 // pass a copy to the callback
833                 OperationOutcome outcome2 = new OperationOutcome(outcome);
834                 outcome2.setFinalOutcome(false);
835                 params.callbackStarted(outcome2);
836             }
837         };
838     }
839
840     /**
841      * Sets the end time of the operation and invokes the callback to indicate that the
842      * operation has completed. Does nothing if the pipeline has been stopped.
843      * <p/>
844      * This assumes that the "outcome" is not {@code null}.
845      * <p/>
846      * Note: the start time must be a reference rather than a plain value, because it's
847      * value must be gotten on-demand, when the returned function is executed at a later
848      * time.
849      *
850      * @param callbacks used to determine if the end callback can be invoked
851      * @return a function that sets the end time and invokes the callback
852      */
853     protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
854
855         return (outcome, thrown) -> {
856             if (callbacks.canEnd()) {
857                 outcome.setSubRequestId(getSubRequestId());
858                 outcome.setStart(callbacks.getStartTime());
859                 outcome.setEnd(callbacks.getEndTime());
860
861                 // pass a copy to the callback
862                 params.callbackCompleted(new OperationOutcome(outcome));
863             }
864         };
865     }
866
867     /**
868      * Sets an operation's outcome and message, based on a throwable.
869      *
870      * @param operation operation to be updated
871      * @return the updated operation
872      */
873     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
874         OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT
875                 : OperationResult.FAILURE_EXCEPTION);
876         return setOutcome(operation, result);
877     }
878
879     /**
880      * Sets an operation's outcome and default message based on the result.
881      *
882      * @param operation operation to be updated
883      * @param result result of the operation
884      * @return the updated operation
885      */
886     public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) {
887         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
888         operation.setResult(result);
889         operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
890                         : ControlLoopOperation.FAILED_MSG);
891
892         return operation;
893     }
894
895     /**
896      * Makes an outcome, populating the "target" field with the contents of the target
897      * entity property.
898      *
899      * @return a new operation outcome
900      */
901     protected OperationOutcome makeOutcome() {
902         OperationOutcome outcome = params.makeOutcome();
903         outcome.setTarget(getProperty(OperationProperties.AAI_TARGET_ENTITY));
904         return outcome;
905     }
906
907     /**
908      * Determines if a throwable is due to a timeout.
909      *
910      * @param thrown throwable of interest
911      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
912      */
913     protected boolean isTimeout(Throwable thrown) {
914         if (thrown instanceof CompletionException) {
915             thrown = thrown.getCause();
916         }
917
918         return (thrown instanceof TimeoutException);
919     }
920
921     /**
922      * Logs a message. If the message is not of type, String, then it attempts to
923      * pretty-print it into JSON before logging.
924      *
925      * @param direction IN or OUT
926      * @param infra communication infrastructure on which it was published
927      * @param source source name (e.g., the URL or Topic name)
928      * @param message message to be logged
929      * @return the JSON text that was logged
930      */
931     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
932         String json;
933         try {
934             json = prettyPrint(message);
935
936         } catch (IllegalArgumentException e) {
937             String type = (direction == EventType.IN ? "response" : "request");
938             logger.warn("cannot pretty-print {}", type, e);
939             json = message.toString();
940         }
941
942         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
943
944         return json;
945     }
946
947     /**
948      * Converts a message to a "pretty-printed" String using the operation's normal
949      * serialization provider (i.e., it's <i>coder</i>).
950      *
951      * @param message response to be logged
952      * @return the JSON text that was logged
953      * @throws IllegalArgumentException if the message cannot be converted
954      */
955     public <T> String prettyPrint(T message) {
956         if (message == null) {
957             return null;
958         } else if (message instanceof String) {
959             return message.toString();
960         } else {
961             try {
962                 return getCoder().encode(message, true);
963             } catch (CoderException e) {
964                 throw new IllegalArgumentException("cannot encode message", e);
965             }
966         }
967     }
968
969     // these may be overridden by subclasses or junit tests
970
971     /**
972      * Gets the retry count.
973      *
974      * @param retry retry, extracted from the parameters, or {@code null}
975      * @return the number of retries, or {@code 0} if no retries were specified
976      */
977     protected int getRetry(Integer retry) {
978         return (retry == null ? 0 : retry);
979     }
980
981     /**
982      * Gets the retry wait, in milliseconds.
983      *
984      * @return the retry wait, in milliseconds
985      */
986     protected long getRetryWaitMs() {
987         return DEFAULT_RETRY_WAIT_MS;
988     }
989
990     /**
991      * Gets the operation timeout.
992      *
993      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
994      *        {@code null}
995      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
996      *         specified
997      */
998     protected long getTimeoutMs(Integer timeoutSec) {
999         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1000     }
1001
1002     // these may be overridden by junit tests
1003
1004     protected Coder getCoder() {
1005         return coder;
1006     }
1007 }