Add property lists to Actors
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedHashMap;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Queue;
32 import java.util.UUID;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import java.util.function.BiConsumer;
40 import java.util.function.Function;
41 import java.util.function.Supplier;
42 import java.util.function.UnaryOperator;
43 import lombok.AccessLevel;
44 import lombok.Getter;
45 import lombok.Setter;
46 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
48 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
49 import org.onap.policy.common.utils.coder.Coder;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.controlloop.ControlLoopOperation;
53 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
54 import org.onap.policy.controlloop.actorserviceprovider.Operation;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
56 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
58 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
59 import org.onap.policy.controlloop.policy.PolicyResult;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * Partial implementation of an operator. In general, it's preferable that subclasses
65  * would override {@link #startOperationAsync(int, OperationOutcome)
66  * startOperationAsync()}. However, if that proves to be too difficult, then they can
67  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
68  * if the operation requires any preprocessor steps, the subclass may choose to override
69  * {@link #startPreprocessorAsync()}.
70  * <p/>
71  * The futures returned by the methods within this class can be canceled, and will
72  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
73  * returned by overridden methods will do the same. Of course, if a class overrides
74  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
75  * be done to cancel that particular operation.
76  * <p/>
77  * In general tasks in a pipeline are executed by the same thread. However, the following
78  * should always be executed via the executor specified in "params":
79  * <ul>
80  * <li>start callback</li>
81  * <li>completion callback</li>
82  * <li>controller completion (i.e., delayedComplete())</li>
83  * </ul>
84  */
85 public abstract class OperationPartial implements Operation {
86     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
87     private static final Coder coder = new StandardCoder();
88
89     public static final String GUARD_ACTOR_NAME = "GUARD";
90     public static final String GUARD_OPERATION_NAME = "Decision";
91     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
92
93     private final OperatorConfig config;
94
95     /**
96      * Operation parameters.
97      */
98     protected final ControlLoopOperationParams params;
99
100     @Getter
101     private final String fullName;
102
103     @Getter
104     @Setter(AccessLevel.PROTECTED)
105     private String subRequestId;
106
107     @Getter
108     private final List<String> propertyNames;
109
110     /**
111      * Values for the properties identified by {@link #getPropertyNames()}.
112      */
113     private final Map<String, Object> properties = new HashMap<>();
114
115
116     /**
117      * Constructs the object.
118      *
119      * @param params operation parameters
120      * @param config configuration for this operation
121      * @param propertyNames names of properties required by this operation
122      */
123     public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
124         this.params = params;
125         this.config = config;
126         this.fullName = params.getActor() + "." + params.getOperation();
127         this.propertyNames = propertyNames;
128     }
129
130     public Executor getBlockingExecutor() {
131         return config.getBlockingExecutor();
132     }
133
134     public String getActorName() {
135         return params.getActor();
136     }
137
138     public String getName() {
139         return params.getOperation();
140     }
141
142     /**
143      * Sets a property.
144      *
145      * @param name property name
146      * @param value new value
147      */
148     public void setProperty(String name, Object value) {
149         properties.put(name, value);
150     }
151
152     /**
153      * Gets a property's value.
154      *
155      * @param name name of the property of interest
156      * @return the property's value, or {@code null} if it has no value
157      */
158     @SuppressWarnings("unchecked")
159     public <T> T getProperty(String name) {
160         return (T) properties.get(name);
161     }
162
163     @Override
164     public CompletableFuture<OperationOutcome> start() {
165         // allocate a controller for the entire operation
166         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
167
168         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
169         if (preproc == null) {
170             // no preprocessor required - just start the operation
171             return startOperationAttempt(controller, 1);
172         }
173
174         /*
175          * Do preprocessor first and then, if successful, start the operation. Note:
176          * operations create their own outcome, ignoring the outcome from any previous
177          * steps.
178          *
179          * Wrap the preprocessor to ensure "stop" is propagated to it.
180          */
181         // @formatter:off
182         controller.wrap(preproc)
183                         .exceptionally(fromException("preprocessor of operation"))
184                         .thenCompose(handlePreprocessorFailure(controller))
185                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
186                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
187         // @formatter:on
188
189         return controller;
190     }
191
192     /**
193      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
194      * invokes the call-backs, marks the controller complete, and returns an incomplete
195      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
196      * received.
197      * <p/>
198      * Assumes that no callbacks have been invoked yet.
199      *
200      * @param controller pipeline controller
201      * @return a function that checks the outcome status and continues, if successful, or
202      *         indicates a failure otherwise
203      */
204     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
205                     PipelineControllerFuture<OperationOutcome> controller) {
206
207         return outcome -> {
208
209             if (isSuccess(outcome)) {
210                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
211                 return CompletableFuture.completedFuture(outcome);
212             }
213
214             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
215
216             final Executor executor = params.getExecutor();
217             final CallbackManager callbacks = new CallbackManager();
218
219             // propagate "stop" to the callbacks
220             controller.add(callbacks);
221
222             final OperationOutcome outcome2 = params.makeOutcome();
223
224             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
225
226             outcome2.setFinalOutcome(true);
227             outcome2.setResult(PolicyResult.FAILURE_GUARD);
228             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
229
230             // @formatter:off
231             CompletableFuture.completedFuture(outcome2)
232                             .whenCompleteAsync(callbackStarted(callbacks), executor)
233                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
234                             .whenCompleteAsync(controller.delayedComplete(), executor);
235             // @formatter:on
236
237             return new CompletableFuture<>();
238         };
239     }
240
241     /**
242      * Invokes the operation's preprocessor step(s) as a "future". This method simply
243      * returns {@code null}.
244      * <p/>
245      * This method assumes the following:
246      * <ul>
247      * <li>the operator is alive</li>
248      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
249      * </ul>
250      *
251      * @return a function that will start the preprocessor and returns its outcome, or
252      *         {@code null} if this operation needs no preprocessor
253      */
254     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
255         return null;
256     }
257
258     /**
259      * Invokes the operation's guard step(s) as a "future".
260      * <p/>
261      * This method assumes the following:
262      * <ul>
263      * <li>the operator is alive</li>
264      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
265      * </ul>
266      *
267      * @return a function that will start the guard checks and returns its outcome, or
268      *         {@code null} if this operation has no guard
269      */
270     protected CompletableFuture<OperationOutcome> startGuardAsync() {
271         // get the guard payload
272         Map<String, Object> payload = makeGuardPayload();
273
274         /*
275          * Note: can't use constants from actor.guard, because that would create a
276          * circular dependency.
277          */
278         return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
279                         .payload(payload).build().start();
280     }
281
282     /**
283      * Creates a payload to execute a guard operation.
284      *
285      * @return a new guard payload
286      */
287     protected Map<String, Object> makeGuardPayload() {
288         Map<String, Object> guard = new LinkedHashMap<>();
289         guard.put("actor", params.getActor());
290         guard.put("operation", params.getOperation());
291         guard.put("target", params.getTargetEntity());
292         guard.put("requestId", params.getRequestId());
293
294         String clname = params.getContext().getEvent().getClosedLoopControlName();
295         if (clname != null) {
296             guard.put("clname", clname);
297         }
298
299         return guard;
300     }
301
302     /**
303      * Starts the operation attempt, with no preprocessor. When all retries complete, it
304      * will complete the controller.
305      *
306      * @param controller controller for all operation attempts
307      * @param attempt attempt number, typically starting with 1
308      * @return a future that will return the final result of all attempts
309      */
310     private CompletableFuture<OperationOutcome> startOperationAttempt(
311                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
312
313         generateSubRequestId(attempt);
314
315         // propagate "stop" to the operation attempt
316         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
317                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
318
319         return controller;
320     }
321
322     /**
323      * Generates and sets {@link #subRequestId} to a new subrequest ID.
324      * @param attempt attempt number, typically starting with 1
325      */
326     public void generateSubRequestId(int attempt) {
327         // Note: this should be "protected", but that makes junits much messier
328
329         setSubRequestId(UUID.randomUUID().toString());
330     }
331
332     /**
333      * Starts the operation attempt, without doing any retries.
334      *
335      * @param params operation parameters
336      * @param attempt attempt number, typically starting with 1
337      * @return a future that will return the result of a single operation attempt
338      */
339     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
340
341         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
342
343         final Executor executor = params.getExecutor();
344         final OperationOutcome outcome = params.makeOutcome();
345         final CallbackManager callbacks = new CallbackManager();
346
347         // this operation attempt gets its own controller
348         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
349
350         // propagate "stop" to the callbacks
351         controller.add(callbacks);
352
353         // @formatter:off
354         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
355                         .whenCompleteAsync(callbackStarted(callbacks), executor)
356                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
357         // @formatter:on
358
359         // handle timeouts, if specified
360         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
361         if (timeoutMillis > 0) {
362             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
363             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
364         }
365
366         /*
367          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
368          * before callbackCompleted() is invoked.
369          *
370          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
371          * the pipeline as the last step anyway.
372          */
373
374         // @formatter:off
375         future.exceptionally(fromException("operation"))
376                     .thenApply(setRetryFlag(attempt))
377                     .whenCompleteAsync(callbackStarted(callbacks), executor)
378                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
379                     .whenCompleteAsync(controller.delayedComplete(), executor);
380         // @formatter:on
381
382         return controller;
383     }
384
385     /**
386      * Determines if the outcome was successful.
387      *
388      * @param outcome outcome to examine
389      * @return {@code true} if the outcome was successful
390      */
391     protected boolean isSuccess(OperationOutcome outcome) {
392         return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
393     }
394
395     /**
396      * Determines if the outcome was a failure for this operator.
397      *
398      * @param outcome outcome to examine, or {@code null}
399      * @return {@code true} if the outcome is not {@code null} and was a failure
400      *         <i>and</i> was associated with this operator, {@code false} otherwise
401      */
402     protected boolean isActorFailed(OperationOutcome outcome) {
403         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
404     }
405
406     /**
407      * Determines if the given outcome is for this operation.
408      *
409      * @param outcome outcome to examine
410      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
411      */
412     protected boolean isSameOperation(OperationOutcome outcome) {
413         return OperationOutcome.isFor(outcome, getActorName(), getName());
414     }
415
416     /**
417      * Invokes the operation as a "future". This method simply invokes
418      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
419      * returning the result via a "future".
420      * <p/>
421      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
422      * the executor in the "params", as that may bring the background thread pool to a
423      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
424      * instead.
425      * <p/>
426      * This method assumes the following:
427      * <ul>
428      * <li>the operator is alive</li>
429      * <li>verifyRunning() has been invoked</li>
430      * <li>callbackStarted() has been invoked</li>
431      * <li>the invoker will perform appropriate timeout checks</li>
432      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
433      * </ul>
434      *
435      * @param attempt attempt number, typically starting with 1
436      * @return a function that will start the operation and return its result when
437      *         complete
438      */
439     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
440
441         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
442     }
443
444     /**
445      * Low-level method that performs the operation. This can make the same assumptions
446      * that are made by {@link #doOperationAsFuture()}. This particular method simply
447      * throws an {@link UnsupportedOperationException}.
448      *
449      * @param attempt attempt number, typically starting with 1
450      * @param operation the operation being performed
451      * @return the outcome of the operation
452      */
453     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
454
455         throw new UnsupportedOperationException("start operation " + getFullName());
456     }
457
458     /**
459      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
460      * FAILURE, assuming the policy specifies retries and the retry count has been
461      * exhausted.
462      *
463      * @param attempt latest attempt number, starting with 1
464      * @return a function to get the next future to execute
465      */
466     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
467
468         return origOutcome -> {
469             // ensure we have a non-null outcome
470             OperationOutcome outcome;
471             if (origOutcome != null) {
472                 outcome = origOutcome;
473             } else {
474                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
475                 outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
476             }
477
478             // ensure correct actor/operation
479             outcome.setActor(getActorName());
480             outcome.setOperation(getName());
481
482             // determine if we should retry, based on the result
483             if (outcome.getResult() != PolicyResult.FAILURE) {
484                 // do not retry success or other failure types (e.g., exception)
485                 outcome.setFinalOutcome(true);
486                 return outcome;
487             }
488
489             int retry = getRetry(params.getRetry());
490             if (retry <= 0) {
491                 // no retries were specified
492                 outcome.setFinalOutcome(true);
493
494             } else if (attempt <= retry) {
495                 // have more retries - not the final outcome
496                 outcome.setFinalOutcome(false);
497
498             } else {
499                 /*
500                  * retries were specified and we've already tried them all - change to
501                  * FAILURE_RETRIES
502                  */
503                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
504                 outcome.setResult(PolicyResult.FAILURE_RETRIES);
505                 outcome.setFinalOutcome(true);
506             }
507
508             return outcome;
509         };
510     }
511
512     /**
513      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
514      * was previously invoked, and thus that the "operation" is not {@code null}.
515      *
516      * @param controller controller for all of the retries
517      * @param attempt latest attempt number, starting with 1
518      * @return a function to get the next future to execute
519      */
520     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
521                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
522
523         return operation -> {
524             if (!isActorFailed(operation)) {
525                 // wrong type or wrong operation - just leave it as is
526                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
527                 controller.complete(operation);
528                 return new CompletableFuture<>();
529             }
530
531             if (getRetry(params.getRetry()) <= 0) {
532                 // no retries - already marked as FAILURE, so just return it
533                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
534                 controller.complete(operation);
535                 return new CompletableFuture<>();
536             }
537
538             /*
539              * Retry the operation.
540              */
541             long waitMs = getRetryWaitMs();
542             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
543
544             return sleep(waitMs, TimeUnit.MILLISECONDS)
545                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
546         };
547     }
548
549     /**
550      * Convenience method that starts a sleep(), running via a future.
551      *
552      * @param sleepTime time to sleep
553      * @param unit time unit
554      * @return a future that will complete when the sleep completes
555      */
556     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
557         if (sleepTime <= 0) {
558             return CompletableFuture.completedFuture(null);
559         }
560
561         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
562     }
563
564     /**
565      * Converts an exception into an operation outcome, returning a copy of the outcome to
566      * prevent background jobs from changing it.
567      *
568      * @param type type of item throwing the exception
569      * @return a function that will convert an exception into an operation outcome
570      */
571     private Function<Throwable, OperationOutcome> fromException(String type) {
572
573         return thrown -> {
574             OperationOutcome outcome = params.makeOutcome();
575
576             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
577                 // do not include exception in the message, as it just clutters the log
578                 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
579                                 params.getRequestId());
580             } else {
581                 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
582                                 params.getRequestId(), thrown);
583             }
584
585             return setOutcome(outcome, thrown);
586         };
587     }
588
589     /**
590      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
591      * any outstanding futures when one completes.
592      *
593      * @param futureMakers function to make a future. If the function returns
594      *        {@code null}, then no future is created for that function. On the other
595      *        hand, if the function throws an exception, then the previously created
596      *        functions are canceled and the exception is re-thrown
597      * @return a future to cancel or await an outcome, or {@code null} if no futures were
598      *         created. If this future is canceled, then all of the futures will be
599      *         canceled
600      */
601     public CompletableFuture<OperationOutcome> anyOf(
602                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
603
604         return anyOf(Arrays.asList(futureMakers));
605     }
606
607     /**
608      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
609      * any outstanding futures when one completes.
610      *
611      * @param futureMakers function to make a future. If the function returns
612      *        {@code null}, then no future is created for that function. On the other
613      *        hand, if the function throws an exception, then the previously created
614      *        functions are canceled and the exception is re-thrown
615      * @return a future to cancel or await an outcome, or {@code null} if no futures were
616      *         created. If this future is canceled, then all of the futures will be
617      *         canceled. Similarly, when this future completes, any incomplete futures
618      *         will be canceled
619      */
620     public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
621
622         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
623
624         CompletableFuture<OperationOutcome>[] futures =
625                         attachFutures(controller, futureMakers, UnaryOperator.identity());
626
627         if (futures.length == 0) {
628             // no futures were started
629             return null;
630         }
631
632         if (futures.length == 1) {
633             return futures[0];
634         }
635
636         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
637                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
638
639         return controller;
640     }
641
642     /**
643      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
644      *
645      * @param futureMakers function to make a future. If the function returns
646      *        {@code null}, then no future is created for that function. On the other
647      *        hand, if the function throws an exception, then the previously created
648      *        functions are canceled and the exception is re-thrown
649      * @return a future to cancel or await an outcome, or {@code null} if no futures were
650      *         created. If this future is canceled, then all of the futures will be
651      *         canceled
652      */
653     public CompletableFuture<OperationOutcome> allOf(
654                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
655
656         return allOf(Arrays.asList(futureMakers));
657     }
658
659     /**
660      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
661      *
662      * @param futureMakers function to make a future. If the function returns
663      *        {@code null}, then no future is created for that function. On the other
664      *        hand, if the function throws an exception, then the previously created
665      *        functions are canceled and the exception is re-thrown
666      * @return a future to cancel or await an outcome, or {@code null} if no futures were
667      *         created. If this future is canceled, then all of the futures will be
668      *         canceled. Similarly, when this future completes, any incomplete futures
669      *         will be canceled
670      */
671     public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
672         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
673
674         Queue<OperationOutcome> outcomes = new LinkedList<>();
675
676         CompletableFuture<OperationOutcome>[] futures =
677                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
678                             synchronized (outcomes) {
679                                 outcomes.add(outcome);
680                             }
681                             return outcome;
682                         }));
683
684         if (futures.length == 0) {
685             // no futures were started
686             return null;
687         }
688
689         if (futures.length == 1) {
690             return futures[0];
691         }
692
693         // @formatter:off
694         CompletableFuture.allOf(futures)
695                         .thenApply(unused -> combineOutcomes(outcomes))
696                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
697         // @formatter:on
698
699         return controller;
700     }
701
702     /**
703      * Invokes the functions to create the futures and attaches them to the controller.
704      *
705      * @param controller master controller for all of the futures
706      * @param futureMakers futures to be attached to the controller
707      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
708      *        Returns the adorned future
709      * @return an array of futures, possibly zero-length. If the array is of size one,
710      *         then that one item should be returned instead of the controller
711      */
712     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
713                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
714                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
715
716         if (futureMakers.isEmpty()) {
717             @SuppressWarnings("unchecked")
718             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
719             return result;
720         }
721
722         // the last, unadorned future that is created
723         CompletableFuture<OperationOutcome> lastFuture = null;
724
725         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
726
727         // make each future
728         for (var maker : futureMakers) {
729             try {
730                 CompletableFuture<OperationOutcome> future = maker.get();
731                 if (future == null) {
732                     continue;
733                 }
734
735                 // propagate "stop" to the future
736                 controller.add(future);
737
738                 futures.add(adorn.apply(future));
739
740                 lastFuture = future;
741
742             } catch (RuntimeException e) {
743                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
744                 controller.cancel(false);
745                 throw e;
746             }
747         }
748
749         @SuppressWarnings("unchecked")
750         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
751
752         if (result.length == 1) {
753             // special case - return the unadorned future
754             result[0] = lastFuture;
755             return result;
756         }
757
758         return futures.toArray(result);
759     }
760
761     /**
762      * Combines the outcomes from a set of tasks.
763      *
764      * @param outcomes outcomes to be examined
765      * @return the combined outcome
766      */
767     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
768
769         // identify the outcome with the highest priority
770         OperationOutcome outcome = outcomes.remove();
771         int priority = detmPriority(outcome);
772
773         for (OperationOutcome outcome2 : outcomes) {
774             int priority2 = detmPriority(outcome2);
775
776             if (priority2 > priority) {
777                 outcome = outcome2;
778                 priority = priority2;
779             }
780         }
781
782         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
783                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
784
785         return outcome;
786     }
787
788     /**
789      * Determines the priority of an outcome based on its result.
790      *
791      * @param outcome outcome to examine, or {@code null}
792      * @return the outcome's priority
793      */
794     protected int detmPriority(OperationOutcome outcome) {
795         if (outcome == null || outcome.getResult() == null) {
796             return 1;
797         }
798
799         switch (outcome.getResult()) {
800             case SUCCESS:
801                 return 0;
802
803             case FAILURE_GUARD:
804                 return 2;
805
806             case FAILURE_RETRIES:
807                 return 3;
808
809             case FAILURE:
810                 return 4;
811
812             case FAILURE_TIMEOUT:
813                 return 5;
814
815             case FAILURE_EXCEPTION:
816             default:
817                 return 6;
818         }
819     }
820
821     /**
822      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
823      * not created until the previous task completes. The pipeline returns the outcome of
824      * the last task executed.
825      *
826      * @param futureMakers functions to make the futures
827      * @return a future to cancel the sequence or await the outcome
828      */
829     public CompletableFuture<OperationOutcome> sequence(
830                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
831
832         return sequence(Arrays.asList(futureMakers));
833     }
834
835     /**
836      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
837      * not created until the previous task completes. The pipeline returns the outcome of
838      * the last task executed.
839      *
840      * @param futureMakers functions to make the futures
841      * @return a future to cancel the sequence or await the outcome, or {@code null} if
842      *         there were no tasks to perform
843      */
844     public CompletableFuture<OperationOutcome> sequence(
845                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
846
847         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
848
849         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
850         if (nextTask == null) {
851             // no tasks
852             return null;
853         }
854
855         if (queue.isEmpty()) {
856             // only one task - just return it rather than wrapping it in a controller
857             return nextTask;
858         }
859
860         /*
861          * multiple tasks - need a controller to stop whichever task is currently
862          * executing
863          */
864         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
865         final Executor executor = params.getExecutor();
866
867         // @formatter:off
868         controller.wrap(nextTask)
869                     .thenCompose(nextTaskOnSuccess(controller, queue))
870                     .whenCompleteAsync(controller.delayedComplete(), executor);
871         // @formatter:on
872
873         return controller;
874     }
875
876     /**
877      * Executes the next task in the queue, if the previous outcome was successful.
878      *
879      * @param controller pipeline controller
880      * @param taskQueue queue of tasks to be performed
881      * @return a future to execute the remaining tasks, or the current outcome, if it's a
882      *         failure, or if there are no more tasks
883      */
884     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
885                     PipelineControllerFuture<OperationOutcome> controller,
886                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
887
888         return outcome -> {
889             if (!isSuccess(outcome)) {
890                 // return the failure
891                 return CompletableFuture.completedFuture(outcome);
892             }
893
894             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
895             if (nextTask == null) {
896                 // no tasks - just return the success
897                 return CompletableFuture.completedFuture(outcome);
898             }
899
900             // @formatter:off
901             return controller
902                         .wrap(nextTask)
903                         .thenCompose(nextTaskOnSuccess(controller, taskQueue));
904             // @formatter:on
905         };
906     }
907
908     /**
909      * Gets the next task from the queue, skipping those that are {@code null}.
910      *
911      * @param taskQueue task queue
912      * @return the next task, or {@code null} if the queue is now empty
913      */
914     private CompletableFuture<OperationOutcome> getNextTask(
915                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
916
917         Supplier<CompletableFuture<OperationOutcome>> maker;
918
919         while ((maker = taskQueue.poll()) != null) {
920             CompletableFuture<OperationOutcome> future = maker.get();
921             if (future != null) {
922                 return future;
923             }
924         }
925
926         return null;
927     }
928
929     /**
930      * Sets the start time of the operation and invokes the callback to indicate that the
931      * operation has started. Does nothing if the pipeline has been stopped.
932      * <p/>
933      * This assumes that the "outcome" is not {@code null}.
934      *
935      * @param callbacks used to determine if the start callback can be invoked
936      * @return a function that sets the start time and invokes the callback
937      */
938     protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
939
940         return (outcome, thrown) -> {
941
942             if (callbacks.canStart()) {
943                 outcome.setSubRequestId(getSubRequestId());
944                 outcome.setStart(callbacks.getStartTime());
945                 outcome.setEnd(null);
946
947                 // pass a copy to the callback
948                 OperationOutcome outcome2 = new OperationOutcome(outcome);
949                 outcome2.setFinalOutcome(false);
950                 params.callbackStarted(outcome2);
951             }
952         };
953     }
954
955     /**
956      * Sets the end time of the operation and invokes the callback to indicate that the
957      * operation has completed. Does nothing if the pipeline has been stopped.
958      * <p/>
959      * This assumes that the "outcome" is not {@code null}.
960      * <p/>
961      * Note: the start time must be a reference rather than a plain value, because it's
962      * value must be gotten on-demand, when the returned function is executed at a later
963      * time.
964      *
965      * @param callbacks used to determine if the end callback can be invoked
966      * @return a function that sets the end time and invokes the callback
967      */
968     protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
969
970         return (outcome, thrown) -> {
971             if (callbacks.canEnd()) {
972                 outcome.setSubRequestId(getSubRequestId());
973                 outcome.setStart(callbacks.getStartTime());
974                 outcome.setEnd(callbacks.getEndTime());
975
976                 // pass a copy to the callback
977                 params.callbackCompleted(new OperationOutcome(outcome));
978             }
979         };
980     }
981
982     /**
983      * Sets an operation's outcome and message, based on a throwable.
984      *
985      * @param operation operation to be updated
986      * @return the updated operation
987      */
988     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
989         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
990         return setOutcome(operation, result);
991     }
992
993     /**
994      * Sets an operation's outcome and default message based on the result.
995      *
996      * @param operation operation to be updated
997      * @param result result of the operation
998      * @return the updated operation
999      */
1000     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
1001         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
1002         operation.setResult(result);
1003         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
1004                         : ControlLoopOperation.FAILED_MSG);
1005
1006         return operation;
1007     }
1008
1009     /**
1010      * Determines if a throwable is due to a timeout.
1011      *
1012      * @param thrown throwable of interest
1013      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
1014      */
1015     protected boolean isTimeout(Throwable thrown) {
1016         if (thrown instanceof CompletionException) {
1017             thrown = thrown.getCause();
1018         }
1019
1020         return (thrown instanceof TimeoutException);
1021     }
1022
1023     /**
1024      * Logs a message. If the message is not of type, String, then it attempts to
1025      * pretty-print it into JSON before logging.
1026      *
1027      * @param direction IN or OUT
1028      * @param infra communication infrastructure on which it was published
1029      * @param source source name (e.g., the URL or Topic name)
1030      * @param message message to be logged
1031      * @return the JSON text that was logged
1032      */
1033     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
1034         String json;
1035         try {
1036             json = prettyPrint(message);
1037
1038         } catch (IllegalArgumentException e) {
1039             String type = (direction == EventType.IN ? "response" : "request");
1040             logger.warn("cannot pretty-print {}", type, e);
1041             json = message.toString();
1042         }
1043
1044         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1045
1046         return json;
1047     }
1048
1049     /**
1050      * Converts a message to a "pretty-printed" String using the operation's normal
1051      * serialization provider (i.e., it's <i>coder</i>).
1052      *
1053      * @param message response to be logged
1054      * @return the JSON text that was logged
1055      * @throws IllegalArgumentException if the message cannot be converted
1056      */
1057     public <T> String prettyPrint(T message) {
1058         if (message == null) {
1059             return null;
1060         } else if (message instanceof String) {
1061             return message.toString();
1062         } else {
1063             try {
1064                 return getCoder().encode(message, true);
1065             } catch (CoderException e) {
1066                 throw new IllegalArgumentException("cannot encode message", e);
1067             }
1068         }
1069     }
1070
1071     // these may be overridden by subclasses or junit tests
1072
1073     /**
1074      * Gets the retry count.
1075      *
1076      * @param retry retry, extracted from the parameters, or {@code null}
1077      * @return the number of retries, or {@code 0} if no retries were specified
1078      */
1079     protected int getRetry(Integer retry) {
1080         return (retry == null ? 0 : retry);
1081     }
1082
1083     /**
1084      * Gets the retry wait, in milliseconds.
1085      *
1086      * @return the retry wait, in milliseconds
1087      */
1088     protected long getRetryWaitMs() {
1089         return DEFAULT_RETRY_WAIT_MS;
1090     }
1091
1092     /**
1093      * Gets the operation timeout.
1094      *
1095      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1096      *        {@code null}
1097      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1098      *         specified
1099      */
1100     protected long getTimeoutMs(Integer timeoutSec) {
1101         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1102     }
1103
1104     // these may be overridden by junit tests
1105
1106     protected Coder getCoder() {
1107         return coder;
1108     }
1109 }