Merge "Upgrade new tcagen2 policy type"
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedHashMap;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Queue;
32 import java.util.UUID;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import java.util.function.BiConsumer;
40 import java.util.function.Function;
41 import java.util.function.Supplier;
42 import java.util.function.UnaryOperator;
43 import lombok.AccessLevel;
44 import lombok.Getter;
45 import lombok.Setter;
46 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
48 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
49 import org.onap.policy.common.utils.coder.Coder;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.controlloop.ControlLoopOperation;
53 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
54 import org.onap.policy.controlloop.actorserviceprovider.Operation;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
59 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
60 import org.onap.policy.controlloop.policy.PolicyResult;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  * Partial implementation of an operator. In general, it's preferable that subclasses
66  * would override {@link #startOperationAsync(int, OperationOutcome)
67  * startOperationAsync()}. However, if that proves to be too difficult, then they can
68  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
69  * if the operation requires any preprocessor steps, the subclass may choose to override
70  * {@link #startPreprocessorAsync()}.
71  * <p/>
72  * The futures returned by the methods within this class can be canceled, and will
73  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
74  * returned by overridden methods will do the same. Of course, if a class overrides
75  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
76  * be done to cancel that particular operation.
77  * <p/>
78  * In general tasks in a pipeline are executed by the same thread. However, the following
79  * should always be executed via the executor specified in "params":
80  * <ul>
81  * <li>start callback</li>
82  * <li>completion callback</li>
83  * <li>controller completion (i.e., delayedComplete())</li>
84  * </ul>
85  */
86 public abstract class OperationPartial implements Operation {
87     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
88     private static final Coder coder = new StandardCoder();
89
90     public static final String GUARD_ACTOR_NAME = "GUARD";
91     public static final String GUARD_OPERATION_NAME = "Decision";
92     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
93
94     private final OperatorConfig config;
95
96     /**
97      * Operation parameters.
98      */
99     protected final ControlLoopOperationParams params;
100
101     @Getter
102     private final String fullName;
103
104     @Getter
105     @Setter(AccessLevel.PROTECTED)
106     private String subRequestId;
107
108     @Getter
109     private final List<String> propertyNames;
110
111     /**
112      * Values for the properties identified by {@link #getPropertyNames()}.
113      */
114     private final Map<String, Object> properties = new HashMap<>();
115
116
117     /**
118      * Constructs the object.
119      *
120      * @param params operation parameters
121      * @param config configuration for this operation
122      * @param propertyNames names of properties required by this operation
123      */
124     public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
125         this.params = params;
126         this.config = config;
127         this.fullName = params.getActor() + "." + params.getOperation();
128         this.propertyNames = propertyNames;
129     }
130
131     public Executor getBlockingExecutor() {
132         return config.getBlockingExecutor();
133     }
134
135     public String getActorName() {
136         return params.getActor();
137     }
138
139     public String getName() {
140         return params.getOperation();
141     }
142
143     @Override
144     public boolean containsProperty(String name) {
145         return properties.containsKey(name);
146     }
147
148     @Override
149     public void setProperty(String name, Object value) {
150         logger.info("{}: set property {}={}", getFullName(), name, value);
151         properties.put(name, value);
152     }
153
154     @SuppressWarnings("unchecked")
155     @Override
156     public <T> T getProperty(String name) {
157         return (T) properties.get(name);
158     }
159
160     @Override
161     public CompletableFuture<OperationOutcome> start() {
162         // allocate a controller for the entire operation
163         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
164
165         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
166         if (preproc == null) {
167             // no preprocessor required - just start the operation
168             return startOperationAttempt(controller, 1);
169         }
170
171         /*
172          * Do preprocessor first and then, if successful, start the operation. Note:
173          * operations create their own outcome, ignoring the outcome from any previous
174          * steps.
175          *
176          * Wrap the preprocessor to ensure "stop" is propagated to it.
177          */
178         // @formatter:off
179         controller.wrap(preproc)
180                         .exceptionally(fromException("preprocessor of operation"))
181                         .thenCompose(handlePreprocessorFailure(controller))
182                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
183                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
184         // @formatter:on
185
186         return controller;
187     }
188
189     /**
190      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
191      * invokes the call-backs, marks the controller complete, and returns an incomplete
192      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
193      * received.
194      * <p/>
195      * Assumes that no callbacks have been invoked yet.
196      *
197      * @param controller pipeline controller
198      * @return a function that checks the outcome status and continues, if successful, or
199      *         indicates a failure otherwise
200      */
201     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
202                     PipelineControllerFuture<OperationOutcome> controller) {
203
204         return outcome -> {
205
206             if (isSuccess(outcome)) {
207                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
208                 return CompletableFuture.completedFuture(outcome);
209             }
210
211             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
212
213             final Executor executor = params.getExecutor();
214             final CallbackManager callbacks = new CallbackManager();
215
216             // propagate "stop" to the callbacks
217             controller.add(callbacks);
218
219             final OperationOutcome outcome2 = params.makeOutcome(getTargetEntity());
220
221             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
222
223             outcome2.setFinalOutcome(true);
224             outcome2.setResult(PolicyResult.FAILURE_GUARD);
225             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
226
227             // @formatter:off
228             CompletableFuture.completedFuture(outcome2)
229                             .whenCompleteAsync(callbackStarted(callbacks), executor)
230                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
231                             .whenCompleteAsync(controller.delayedComplete(), executor);
232             // @formatter:on
233
234             return new CompletableFuture<>();
235         };
236     }
237
238     /**
239      * Invokes the operation's preprocessor step(s) as a "future". This method simply
240      * returns {@code null}.
241      * <p/>
242      * This method assumes the following:
243      * <ul>
244      * <li>the operator is alive</li>
245      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
246      * </ul>
247      *
248      * @return a function that will start the preprocessor and returns its outcome, or
249      *         {@code null} if this operation needs no preprocessor
250      */
251     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
252         return null;
253     }
254
255     /**
256      * Invokes the operation's guard step(s) as a "future".
257      * <p/>
258      * This method assumes the following:
259      * <ul>
260      * <li>the operator is alive</li>
261      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
262      * </ul>
263      *
264      * @return a function that will start the guard checks and returns its outcome, or
265      *         {@code null} if this operation has no guard
266      */
267     protected CompletableFuture<OperationOutcome> startGuardAsync() {
268         if (params.isPreprocessed()) {
269             return null;
270         }
271
272         // get the guard payload
273         Map<String, Object> payload = makeGuardPayload();
274
275         /*
276          * Note: can't use constants from actor.guard, because that would create a
277          * circular dependency.
278          */
279         return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
280                         .payload(payload).build().start();
281     }
282
283     /**
284      * Creates a payload to execute a guard operation.
285      *
286      * @return a new guard payload
287      */
288     protected Map<String, Object> makeGuardPayload() {
289         // TODO delete this once preprocessing is done by the application
290         Map<String, Object> guard = new LinkedHashMap<>();
291         guard.put("actor", params.getActor());
292         guard.put("operation", params.getOperation());
293         guard.put("target", getTargetEntity());
294         guard.put("requestId", params.getRequestId());
295
296         String clname = params.getContext().getEvent().getClosedLoopControlName();
297         if (clname != null) {
298             guard.put("clname", clname);
299         }
300
301         return guard;
302     }
303
304     /**
305      * Starts the operation attempt, with no preprocessor. When all retries complete, it
306      * will complete the controller.
307      *
308      * @param controller controller for all operation attempts
309      * @param attempt attempt number, typically starting with 1
310      * @return a future that will return the final result of all attempts
311      */
312     private CompletableFuture<OperationOutcome> startOperationAttempt(
313                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
314
315         generateSubRequestId(attempt);
316
317         // propagate "stop" to the operation attempt
318         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
319                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
320
321         return controller;
322     }
323
324     /**
325      * Generates and sets {@link #subRequestId} to a new subrequest ID.
326      *
327      * @param attempt attempt number, typically starting with 1
328      */
329     public void generateSubRequestId(int attempt) {
330         // Note: this should be "protected", but that makes junits much messier
331
332         setSubRequestId(UUID.randomUUID().toString());
333     }
334
335     /**
336      * Starts the operation attempt, without doing any retries.
337      *
338      * @param params operation parameters
339      * @param attempt attempt number, typically starting with 1
340      * @return a future that will return the result of a single operation attempt
341      */
342     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
343
344         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
345
346         final Executor executor = params.getExecutor();
347         final OperationOutcome outcome = params.makeOutcome(getTargetEntity());
348         final CallbackManager callbacks = new CallbackManager();
349
350         // this operation attempt gets its own controller
351         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
352
353         // propagate "stop" to the callbacks
354         controller.add(callbacks);
355
356         // @formatter:off
357         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
358                         .whenCompleteAsync(callbackStarted(callbacks), executor)
359                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
360         // @formatter:on
361
362         // handle timeouts, if specified
363         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
364         if (timeoutMillis > 0) {
365             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
366             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
367         }
368
369         /*
370          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
371          * before callbackCompleted() is invoked.
372          *
373          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
374          * the pipeline as the last step anyway.
375          */
376
377         // @formatter:off
378         future.exceptionally(fromException("operation"))
379                     .thenApply(setRetryFlag(attempt))
380                     .whenCompleteAsync(callbackStarted(callbacks), executor)
381                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
382                     .whenCompleteAsync(controller.delayedComplete(), executor);
383         // @formatter:on
384
385         return controller;
386     }
387
388     /**
389      * Determines if the outcome was successful.
390      *
391      * @param outcome outcome to examine
392      * @return {@code true} if the outcome was successful
393      */
394     protected boolean isSuccess(OperationOutcome outcome) {
395         return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
396     }
397
398     /**
399      * Determines if the outcome was a failure for this operator.
400      *
401      * @param outcome outcome to examine, or {@code null}
402      * @return {@code true} if the outcome is not {@code null} and was a failure
403      *         <i>and</i> was associated with this operator, {@code false} otherwise
404      */
405     protected boolean isActorFailed(OperationOutcome outcome) {
406         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
407     }
408
409     /**
410      * Determines if the given outcome is for this operation.
411      *
412      * @param outcome outcome to examine
413      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
414      */
415     protected boolean isSameOperation(OperationOutcome outcome) {
416         return OperationOutcome.isFor(outcome, getActorName(), getName());
417     }
418
419     /**
420      * Invokes the operation as a "future". This method simply invokes
421      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
422      * returning the result via a "future".
423      * <p/>
424      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
425      * the executor in the "params", as that may bring the background thread pool to a
426      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
427      * instead.
428      * <p/>
429      * This method assumes the following:
430      * <ul>
431      * <li>the operator is alive</li>
432      * <li>verifyRunning() has been invoked</li>
433      * <li>callbackStarted() has been invoked</li>
434      * <li>the invoker will perform appropriate timeout checks</li>
435      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
436      * </ul>
437      *
438      * @param attempt attempt number, typically starting with 1
439      * @return a function that will start the operation and return its result when
440      *         complete
441      */
442     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
443
444         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
445     }
446
447     /**
448      * Low-level method that performs the operation. This can make the same assumptions
449      * that are made by {@link #doOperationAsFuture()}. This particular method simply
450      * throws an {@link UnsupportedOperationException}.
451      *
452      * @param attempt attempt number, typically starting with 1
453      * @param operation the operation being performed
454      * @return the outcome of the operation
455      */
456     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
457
458         throw new UnsupportedOperationException("start operation " + getFullName());
459     }
460
461     /**
462      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
463      * FAILURE, assuming the policy specifies retries and the retry count has been
464      * exhausted.
465      *
466      * @param attempt latest attempt number, starting with 1
467      * @return a function to get the next future to execute
468      */
469     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
470
471         return origOutcome -> {
472             // ensure we have a non-null outcome
473             OperationOutcome outcome;
474             if (origOutcome != null) {
475                 outcome = origOutcome;
476             } else {
477                 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
478                 outcome = this.setOutcome(params.makeOutcome(getTargetEntity()), PolicyResult.FAILURE);
479             }
480
481             // ensure correct actor/operation
482             outcome.setActor(getActorName());
483             outcome.setOperation(getName());
484
485             // determine if we should retry, based on the result
486             if (outcome.getResult() != PolicyResult.FAILURE) {
487                 // do not retry success or other failure types (e.g., exception)
488                 outcome.setFinalOutcome(true);
489                 return outcome;
490             }
491
492             int retry = getRetry(params.getRetry());
493             if (retry <= 0) {
494                 // no retries were specified
495                 outcome.setFinalOutcome(true);
496
497             } else if (attempt <= retry) {
498                 // have more retries - not the final outcome
499                 outcome.setFinalOutcome(false);
500
501             } else {
502                 /*
503                  * retries were specified and we've already tried them all - change to
504                  * FAILURE_RETRIES
505                  */
506                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
507                 outcome.setResult(PolicyResult.FAILURE_RETRIES);
508                 outcome.setFinalOutcome(true);
509             }
510
511             return outcome;
512         };
513     }
514
515     /**
516      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
517      * was previously invoked, and thus that the "operation" is not {@code null}.
518      *
519      * @param controller controller for all of the retries
520      * @param attempt latest attempt number, starting with 1
521      * @return a function to get the next future to execute
522      */
523     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
524                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
525
526         return operation -> {
527             if (!isActorFailed(operation)) {
528                 // wrong type or wrong operation - just leave it as is
529                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
530                 controller.complete(operation);
531                 return new CompletableFuture<>();
532             }
533
534             if (getRetry(params.getRetry()) <= 0) {
535                 // no retries - already marked as FAILURE, so just return it
536                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
537                 controller.complete(operation);
538                 return new CompletableFuture<>();
539             }
540
541             /*
542              * Retry the operation.
543              */
544             long waitMs = getRetryWaitMs();
545             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
546
547             return sleep(waitMs, TimeUnit.MILLISECONDS)
548                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
549         };
550     }
551
552     /**
553      * Convenience method that starts a sleep(), running via a future.
554      *
555      * @param sleepTime time to sleep
556      * @param unit time unit
557      * @return a future that will complete when the sleep completes
558      */
559     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
560         if (sleepTime <= 0) {
561             return CompletableFuture.completedFuture(null);
562         }
563
564         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
565     }
566
567     /**
568      * Converts an exception into an operation outcome, returning a copy of the outcome to
569      * prevent background jobs from changing it.
570      *
571      * @param type type of item throwing the exception
572      * @return a function that will convert an exception into an operation outcome
573      */
574     private Function<Throwable, OperationOutcome> fromException(String type) {
575
576         return thrown -> {
577             OperationOutcome outcome = params.makeOutcome(getTargetEntity());
578
579             if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
580                 // do not include exception in the message, as it just clutters the log
581                 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
582                                 params.getRequestId());
583             } else {
584                 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
585                                 params.getRequestId(), thrown);
586             }
587
588             return setOutcome(outcome, thrown);
589         };
590     }
591
592     /**
593      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
594      * any outstanding futures when one completes.
595      *
596      * @param futureMakers function to make a future. If the function returns
597      *        {@code null}, then no future is created for that function. On the other
598      *        hand, if the function throws an exception, then the previously created
599      *        functions are canceled and the exception is re-thrown
600      * @return a future to cancel or await an outcome, or {@code null} if no futures were
601      *         created. If this future is canceled, then all of the futures will be
602      *         canceled
603      */
604     public CompletableFuture<OperationOutcome> anyOf(
605                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
606
607         return anyOf(Arrays.asList(futureMakers));
608     }
609
610     /**
611      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
612      * any outstanding futures when one completes.
613      *
614      * @param futureMakers function to make a future. If the function returns
615      *        {@code null}, then no future is created for that function. On the other
616      *        hand, if the function throws an exception, then the previously created
617      *        functions are canceled and the exception is re-thrown
618      * @return a future to cancel or await an outcome, or {@code null} if no futures were
619      *         created. If this future is canceled, then all of the futures will be
620      *         canceled. Similarly, when this future completes, any incomplete futures
621      *         will be canceled
622      */
623     public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
624
625         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
626
627         CompletableFuture<OperationOutcome>[] futures =
628                         attachFutures(controller, futureMakers, UnaryOperator.identity());
629
630         if (futures.length == 0) {
631             // no futures were started
632             return null;
633         }
634
635         if (futures.length == 1) {
636             return futures[0];
637         }
638
639         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
640                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
641
642         return controller;
643     }
644
645     /**
646      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
647      *
648      * @param futureMakers function to make a future. If the function returns
649      *        {@code null}, then no future is created for that function. On the other
650      *        hand, if the function throws an exception, then the previously created
651      *        functions are canceled and the exception is re-thrown
652      * @return a future to cancel or await an outcome, or {@code null} if no futures were
653      *         created. If this future is canceled, then all of the futures will be
654      *         canceled
655      */
656     public CompletableFuture<OperationOutcome> allOf(
657                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
658
659         return allOf(Arrays.asList(futureMakers));
660     }
661
662     /**
663      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
664      *
665      * @param futureMakers function to make a future. If the function returns
666      *        {@code null}, then no future is created for that function. On the other
667      *        hand, if the function throws an exception, then the previously created
668      *        functions are canceled and the exception is re-thrown
669      * @return a future to cancel or await an outcome, or {@code null} if no futures were
670      *         created. If this future is canceled, then all of the futures will be
671      *         canceled. Similarly, when this future completes, any incomplete futures
672      *         will be canceled
673      */
674     public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
675         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
676
677         Queue<OperationOutcome> outcomes = new LinkedList<>();
678
679         CompletableFuture<OperationOutcome>[] futures =
680                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
681                             synchronized (outcomes) {
682                                 outcomes.add(outcome);
683                             }
684                             return outcome;
685                         }));
686
687         if (futures.length == 0) {
688             // no futures were started
689             return null;
690         }
691
692         if (futures.length == 1) {
693             return futures[0];
694         }
695
696         // @formatter:off
697         CompletableFuture.allOf(futures)
698                         .thenApply(unused -> combineOutcomes(outcomes))
699                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
700         // @formatter:on
701
702         return controller;
703     }
704
705     /**
706      * Invokes the functions to create the futures and attaches them to the controller.
707      *
708      * @param controller master controller for all of the futures
709      * @param futureMakers futures to be attached to the controller
710      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
711      *        Returns the adorned future
712      * @return an array of futures, possibly zero-length. If the array is of size one,
713      *         then that one item should be returned instead of the controller
714      */
715     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
716                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
717                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
718
719         if (futureMakers.isEmpty()) {
720             @SuppressWarnings("unchecked")
721             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
722             return result;
723         }
724
725         // the last, unadorned future that is created
726         CompletableFuture<OperationOutcome> lastFuture = null;
727
728         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
729
730         // make each future
731         for (var maker : futureMakers) {
732             try {
733                 CompletableFuture<OperationOutcome> future = maker.get();
734                 if (future == null) {
735                     continue;
736                 }
737
738                 // propagate "stop" to the future
739                 controller.add(future);
740
741                 futures.add(adorn.apply(future));
742
743                 lastFuture = future;
744
745             } catch (RuntimeException e) {
746                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
747                 controller.cancel(false);
748                 throw e;
749             }
750         }
751
752         @SuppressWarnings("unchecked")
753         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
754
755         if (result.length == 1) {
756             // special case - return the unadorned future
757             result[0] = lastFuture;
758             return result;
759         }
760
761         return futures.toArray(result);
762     }
763
764     /**
765      * Combines the outcomes from a set of tasks.
766      *
767      * @param outcomes outcomes to be examined
768      * @return the combined outcome
769      */
770     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
771
772         // identify the outcome with the highest priority
773         OperationOutcome outcome = outcomes.remove();
774         int priority = detmPriority(outcome);
775
776         for (OperationOutcome outcome2 : outcomes) {
777             int priority2 = detmPriority(outcome2);
778
779             if (priority2 > priority) {
780                 outcome = outcome2;
781                 priority = priority2;
782             }
783         }
784
785         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
786                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
787
788         return outcome;
789     }
790
791     /**
792      * Determines the priority of an outcome based on its result.
793      *
794      * @param outcome outcome to examine, or {@code null}
795      * @return the outcome's priority
796      */
797     protected int detmPriority(OperationOutcome outcome) {
798         if (outcome == null || outcome.getResult() == null) {
799             return 1;
800         }
801
802         switch (outcome.getResult()) {
803             case SUCCESS:
804                 return 0;
805
806             case FAILURE_GUARD:
807                 return 2;
808
809             case FAILURE_RETRIES:
810                 return 3;
811
812             case FAILURE:
813                 return 4;
814
815             case FAILURE_TIMEOUT:
816                 return 5;
817
818             case FAILURE_EXCEPTION:
819             default:
820                 return 6;
821         }
822     }
823
824     /**
825      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
826      * not created until the previous task completes. The pipeline returns the outcome of
827      * the last task executed.
828      *
829      * @param futureMakers functions to make the futures
830      * @return a future to cancel the sequence or await the outcome
831      */
832     public CompletableFuture<OperationOutcome> sequence(
833                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
834
835         return sequence(Arrays.asList(futureMakers));
836     }
837
838     /**
839      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
840      * not created until the previous task completes. The pipeline returns the outcome of
841      * the last task executed.
842      *
843      * @param futureMakers functions to make the futures
844      * @return a future to cancel the sequence or await the outcome, or {@code null} if
845      *         there were no tasks to perform
846      */
847     public CompletableFuture<OperationOutcome> sequence(
848                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
849
850         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
851
852         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
853         if (nextTask == null) {
854             // no tasks
855             return null;
856         }
857
858         if (queue.isEmpty()) {
859             // only one task - just return it rather than wrapping it in a controller
860             return nextTask;
861         }
862
863         /*
864          * multiple tasks - need a controller to stop whichever task is currently
865          * executing
866          */
867         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
868         final Executor executor = params.getExecutor();
869
870         // @formatter:off
871         controller.wrap(nextTask)
872                     .thenCompose(nextTaskOnSuccess(controller, queue))
873                     .whenCompleteAsync(controller.delayedComplete(), executor);
874         // @formatter:on
875
876         return controller;
877     }
878
879     /**
880      * Executes the next task in the queue, if the previous outcome was successful.
881      *
882      * @param controller pipeline controller
883      * @param taskQueue queue of tasks to be performed
884      * @return a future to execute the remaining tasks, or the current outcome, if it's a
885      *         failure, or if there are no more tasks
886      */
887     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
888                     PipelineControllerFuture<OperationOutcome> controller,
889                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
890
891         return outcome -> {
892             if (!isSuccess(outcome)) {
893                 // return the failure
894                 return CompletableFuture.completedFuture(outcome);
895             }
896
897             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
898             if (nextTask == null) {
899                 // no tasks - just return the success
900                 return CompletableFuture.completedFuture(outcome);
901             }
902
903             // @formatter:off
904             return controller
905                         .wrap(nextTask)
906                         .thenCompose(nextTaskOnSuccess(controller, taskQueue));
907             // @formatter:on
908         };
909     }
910
911     /**
912      * Gets the next task from the queue, skipping those that are {@code null}.
913      *
914      * @param taskQueue task queue
915      * @return the next task, or {@code null} if the queue is now empty
916      */
917     private CompletableFuture<OperationOutcome> getNextTask(
918                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
919
920         Supplier<CompletableFuture<OperationOutcome>> maker;
921
922         while ((maker = taskQueue.poll()) != null) {
923             CompletableFuture<OperationOutcome> future = maker.get();
924             if (future != null) {
925                 return future;
926             }
927         }
928
929         return null;
930     }
931
932     /**
933      * Sets the start time of the operation and invokes the callback to indicate that the
934      * operation has started. Does nothing if the pipeline has been stopped.
935      * <p/>
936      * This assumes that the "outcome" is not {@code null}.
937      *
938      * @param callbacks used to determine if the start callback can be invoked
939      * @return a function that sets the start time and invokes the callback
940      */
941     protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
942
943         return (outcome, thrown) -> {
944
945             if (callbacks.canStart()) {
946                 outcome.setSubRequestId(getSubRequestId());
947                 outcome.setStart(callbacks.getStartTime());
948                 outcome.setEnd(null);
949
950                 // pass a copy to the callback
951                 OperationOutcome outcome2 = new OperationOutcome(outcome);
952                 outcome2.setFinalOutcome(false);
953                 params.callbackStarted(outcome2);
954             }
955         };
956     }
957
958     /**
959      * Sets the end time of the operation and invokes the callback to indicate that the
960      * operation has completed. Does nothing if the pipeline has been stopped.
961      * <p/>
962      * This assumes that the "outcome" is not {@code null}.
963      * <p/>
964      * Note: the start time must be a reference rather than a plain value, because it's
965      * value must be gotten on-demand, when the returned function is executed at a later
966      * time.
967      *
968      * @param callbacks used to determine if the end callback can be invoked
969      * @return a function that sets the end time and invokes the callback
970      */
971     protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
972
973         return (outcome, thrown) -> {
974             if (callbacks.canEnd()) {
975                 outcome.setSubRequestId(getSubRequestId());
976                 outcome.setStart(callbacks.getStartTime());
977                 outcome.setEnd(callbacks.getEndTime());
978
979                 // pass a copy to the callback
980                 params.callbackCompleted(new OperationOutcome(outcome));
981             }
982         };
983     }
984
985     /**
986      * Sets an operation's outcome and message, based on a throwable.
987      *
988      * @param operation operation to be updated
989      * @return the updated operation
990      */
991     public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
992         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
993         return setOutcome(operation, result);
994     }
995
996     /**
997      * Sets an operation's outcome and default message based on the result.
998      *
999      * @param operation operation to be updated
1000      * @param result result of the operation
1001      * @return the updated operation
1002      */
1003     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
1004         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
1005         operation.setResult(result);
1006         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
1007                         : ControlLoopOperation.FAILED_MSG);
1008
1009         return operation;
1010     }
1011
1012     /**
1013      * Determines if a throwable is due to a timeout.
1014      *
1015      * @param thrown throwable of interest
1016      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
1017      */
1018     protected boolean isTimeout(Throwable thrown) {
1019         if (thrown instanceof CompletionException) {
1020             thrown = thrown.getCause();
1021         }
1022
1023         return (thrown instanceof TimeoutException);
1024     }
1025
1026     /**
1027      * Logs a message. If the message is not of type, String, then it attempts to
1028      * pretty-print it into JSON before logging.
1029      *
1030      * @param direction IN or OUT
1031      * @param infra communication infrastructure on which it was published
1032      * @param source source name (e.g., the URL or Topic name)
1033      * @param message message to be logged
1034      * @return the JSON text that was logged
1035      */
1036     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
1037         String json;
1038         try {
1039             json = prettyPrint(message);
1040
1041         } catch (IllegalArgumentException e) {
1042             String type = (direction == EventType.IN ? "response" : "request");
1043             logger.warn("cannot pretty-print {}", type, e);
1044             json = message.toString();
1045         }
1046
1047         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1048
1049         return json;
1050     }
1051
1052     /**
1053      * Converts a message to a "pretty-printed" String using the operation's normal
1054      * serialization provider (i.e., it's <i>coder</i>).
1055      *
1056      * @param message response to be logged
1057      * @return the JSON text that was logged
1058      * @throws IllegalArgumentException if the message cannot be converted
1059      */
1060     public <T> String prettyPrint(T message) {
1061         if (message == null) {
1062             return null;
1063         } else if (message instanceof String) {
1064             return message.toString();
1065         } else {
1066             try {
1067                 return getCoder().encode(message, true);
1068             } catch (CoderException e) {
1069                 throw new IllegalArgumentException("cannot encode message", e);
1070             }
1071         }
1072     }
1073
1074     // these may be overridden by subclasses or junit tests
1075
1076     /**
1077      * Gets the retry count.
1078      *
1079      * @param retry retry, extracted from the parameters, or {@code null}
1080      * @return the number of retries, or {@code 0} if no retries were specified
1081      */
1082     protected int getRetry(Integer retry) {
1083         return (retry == null ? 0 : retry);
1084     }
1085
1086     /**
1087      * Gets the retry wait, in milliseconds.
1088      *
1089      * @return the retry wait, in milliseconds
1090      */
1091     protected long getRetryWaitMs() {
1092         return DEFAULT_RETRY_WAIT_MS;
1093     }
1094
1095     /**
1096      * Gets the target entity, first trying the properties and then the parameters.
1097      *
1098      * @return the target entity
1099      */
1100     protected String getTargetEntity() {
1101         String targetEntity = getProperty(OperationProperties.AAI_TARGET_ENTITY);
1102         return (targetEntity != null ? targetEntity : params.getTargetEntity());
1103     }
1104
1105     /**
1106      * Gets the operation timeout.
1107      *
1108      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1109      *        {@code null}
1110      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1111      *         specified
1112      */
1113     protected long getTimeoutMs(Integer timeoutSec) {
1114         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1115     }
1116
1117     // these may be overridden by junit tests
1118
1119     protected Coder getCoder() {
1120         return coder;
1121     }
1122 }