2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedHashMap;
27 import java.util.LinkedList;
28 import java.util.List;
30 import java.util.Queue;
31 import java.util.UUID;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.CompletionException;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.function.BiConsumer;
39 import java.util.function.Function;
40 import java.util.function.Supplier;
41 import java.util.function.UnaryOperator;
42 import lombok.AccessLevel;
45 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
46 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
48 import org.onap.policy.common.utils.coder.Coder;
49 import org.onap.policy.common.utils.coder.CoderException;
50 import org.onap.policy.common.utils.coder.StandardCoder;
51 import org.onap.policy.controlloop.ControlLoopOperation;
52 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
53 import org.onap.policy.controlloop.actorserviceprovider.Operation;
54 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
55 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
56 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
57 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
58 import org.onap.policy.controlloop.policy.PolicyResult;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
63 * Partial implementation of an operator. In general, it's preferable that subclasses
64 * would override {@link #startOperationAsync(int, OperationOutcome)
65 * startOperationAsync()}. However, if that proves to be too difficult, then they can
66 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
67 * if the operation requires any preprocessor steps, the subclass may choose to override
68 * {@link #startPreprocessorAsync()}.
70 * The futures returned by the methods within this class can be canceled, and will
71 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
72 * returned by overridden methods will do the same. Of course, if a class overrides
73 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
74 * be done to cancel that particular operation.
76 * In general tasks in a pipeline are executed by the same thread. However, the following
77 * should always be executed via the executor specified in "params":
79 * <li>start callback</li>
80 * <li>completion callback</li>
81 * <li>controller completion (i.e., delayedComplete())</li>
84 public abstract class OperationPartial implements Operation {
85 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
86 private static final Coder coder = new StandardCoder();
88 public static final String GUARD_ACTOR_NAME = "GUARD";
89 public static final String GUARD_OPERATION_NAME = "Decision";
90 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
92 private final OperatorConfig config;
95 * Operation parameters.
97 protected final ControlLoopOperationParams params;
100 private final String fullName;
103 @Setter(AccessLevel.PROTECTED)
104 private String subRequestId;
108 * Constructs the object.
110 * @param params operation parameters
111 * @param config configuration for this operation
113 public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
114 this.params = params;
115 this.config = config;
116 this.fullName = params.getActor() + "." + params.getOperation();
119 public Executor getBlockingExecutor() {
120 return config.getBlockingExecutor();
123 public String getActorName() {
124 return params.getActor();
127 public String getName() {
128 return params.getOperation();
132 public CompletableFuture<OperationOutcome> start() {
133 // allocate a controller for the entire operation
134 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
136 CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
137 if (preproc == null) {
138 // no preprocessor required - just start the operation
139 return startOperationAttempt(controller, 1);
143 * Do preprocessor first and then, if successful, start the operation. Note:
144 * operations create their own outcome, ignoring the outcome from any previous
147 * Wrap the preprocessor to ensure "stop" is propagated to it.
150 controller.wrap(preproc)
151 .exceptionally(fromException("preprocessor of operation"))
152 .thenCompose(handlePreprocessorFailure(controller))
153 .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
154 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
161 * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
162 * invokes the call-backs, marks the controller complete, and returns an incomplete
163 * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
166 * Assumes that no callbacks have been invoked yet.
168 * @param controller pipeline controller
169 * @return a function that checks the outcome status and continues, if successful, or
170 * indicates a failure otherwise
172 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
173 PipelineControllerFuture<OperationOutcome> controller) {
177 if (isSuccess(outcome)) {
178 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
179 return CompletableFuture.completedFuture(outcome);
182 logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
184 final Executor executor = params.getExecutor();
185 final CallbackManager callbacks = new CallbackManager();
187 // propagate "stop" to the callbacks
188 controller.add(callbacks);
190 final OperationOutcome outcome2 = params.makeOutcome();
192 // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
194 outcome2.setFinalOutcome(true);
195 outcome2.setResult(PolicyResult.FAILURE_GUARD);
196 outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
199 CompletableFuture.completedFuture(outcome2)
200 .whenCompleteAsync(callbackStarted(callbacks), executor)
201 .whenCompleteAsync(callbackCompleted(callbacks), executor)
202 .whenCompleteAsync(controller.delayedComplete(), executor);
205 return new CompletableFuture<>();
210 * Invokes the operation's preprocessor step(s) as a "future". This method simply
211 * returns {@code null}.
213 * This method assumes the following:
215 * <li>the operator is alive</li>
216 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
219 * @return a function that will start the preprocessor and returns its outcome, or
220 * {@code null} if this operation needs no preprocessor
222 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
227 * Invokes the operation's guard step(s) as a "future".
229 * This method assumes the following:
231 * <li>the operator is alive</li>
232 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
235 * @return a function that will start the guard checks and returns its outcome, or
236 * {@code null} if this operation has no guard
238 protected CompletableFuture<OperationOutcome> startGuardAsync() {
239 // get the guard payload
240 Map<String, Object> payload = makeGuardPayload();
243 * Note: can't use constants from actor.guard, because that would create a
244 * circular dependency.
246 return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
247 .payload(payload).build().start();
251 * Creates a payload to execute a guard operation.
253 * @return a new guard payload
255 protected Map<String, Object> makeGuardPayload() {
256 Map<String, Object> guard = new LinkedHashMap<>();
257 guard.put("actor", params.getActor());
258 guard.put("operation", params.getOperation());
259 guard.put("target", params.getTargetEntity());
260 guard.put("requestId", params.getRequestId());
262 String clname = params.getContext().getEvent().getClosedLoopControlName();
263 if (clname != null) {
264 guard.put("clname", clname);
271 * Starts the operation attempt, with no preprocessor. When all retries complete, it
272 * will complete the controller.
274 * @param controller controller for all operation attempts
275 * @param attempt attempt number, typically starting with 1
276 * @return a future that will return the final result of all attempts
278 private CompletableFuture<OperationOutcome> startOperationAttempt(
279 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
281 generateSubRequestId(attempt);
283 // propagate "stop" to the operation attempt
284 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
285 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
291 * Generates and sets {@link #subRequestId} to a new subrequest ID.
292 * @param attempt attempt number, typically starting with 1
294 public void generateSubRequestId(int attempt) {
295 // Note: this should be "protected", but that makes junits much messier
297 setSubRequestId(UUID.randomUUID().toString());
301 * Starts the operation attempt, without doing any retries.
303 * @param params operation parameters
304 * @param attempt attempt number, typically starting with 1
305 * @return a future that will return the result of a single operation attempt
307 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
309 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
311 final Executor executor = params.getExecutor();
312 final OperationOutcome outcome = params.makeOutcome();
313 final CallbackManager callbacks = new CallbackManager();
315 // this operation attempt gets its own controller
316 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
318 // propagate "stop" to the callbacks
319 controller.add(callbacks);
322 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
323 .whenCompleteAsync(callbackStarted(callbacks), executor)
324 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
327 // handle timeouts, if specified
328 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
329 if (timeoutMillis > 0) {
330 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
331 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
335 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
336 * before callbackCompleted() is invoked.
338 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
339 * the pipeline as the last step anyway.
343 future.exceptionally(fromException("operation"))
344 .thenApply(setRetryFlag(attempt))
345 .whenCompleteAsync(callbackStarted(callbacks), executor)
346 .whenCompleteAsync(callbackCompleted(callbacks), executor)
347 .whenCompleteAsync(controller.delayedComplete(), executor);
354 * Determines if the outcome was successful.
356 * @param outcome outcome to examine
357 * @return {@code true} if the outcome was successful
359 protected boolean isSuccess(OperationOutcome outcome) {
360 return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
364 * Determines if the outcome was a failure for this operator.
366 * @param outcome outcome to examine, or {@code null}
367 * @return {@code true} if the outcome is not {@code null} and was a failure
368 * <i>and</i> was associated with this operator, {@code false} otherwise
370 protected boolean isActorFailed(OperationOutcome outcome) {
371 return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
375 * Determines if the given outcome is for this operation.
377 * @param outcome outcome to examine
378 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
380 protected boolean isSameOperation(OperationOutcome outcome) {
381 return OperationOutcome.isFor(outcome, getActorName(), getName());
385 * Invokes the operation as a "future". This method simply invokes
386 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
387 * returning the result via a "future".
389 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
390 * the executor in the "params", as that may bring the background thread pool to a
391 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
394 * This method assumes the following:
396 * <li>the operator is alive</li>
397 * <li>verifyRunning() has been invoked</li>
398 * <li>callbackStarted() has been invoked</li>
399 * <li>the invoker will perform appropriate timeout checks</li>
400 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
403 * @param attempt attempt number, typically starting with 1
404 * @return a function that will start the operation and return its result when
407 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
409 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
413 * Low-level method that performs the operation. This can make the same assumptions
414 * that are made by {@link #doOperationAsFuture()}. This particular method simply
415 * throws an {@link UnsupportedOperationException}.
417 * @param attempt attempt number, typically starting with 1
418 * @param operation the operation being performed
419 * @return the outcome of the operation
421 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
423 throw new UnsupportedOperationException("start operation " + getFullName());
427 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
428 * FAILURE, assuming the policy specifies retries and the retry count has been
431 * @param attempt latest attempt number, starting with 1
432 * @return a function to get the next future to execute
434 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
436 return origOutcome -> {
437 // ensure we have a non-null outcome
438 OperationOutcome outcome;
439 if (origOutcome != null) {
440 outcome = origOutcome;
442 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
443 outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
446 // ensure correct actor/operation
447 outcome.setActor(getActorName());
448 outcome.setOperation(getName());
450 // determine if we should retry, based on the result
451 if (outcome.getResult() != PolicyResult.FAILURE) {
452 // do not retry success or other failure types (e.g., exception)
453 outcome.setFinalOutcome(true);
457 int retry = getRetry(params.getRetry());
459 // no retries were specified
460 outcome.setFinalOutcome(true);
462 } else if (attempt <= retry) {
463 // have more retries - not the final outcome
464 outcome.setFinalOutcome(false);
468 * retries were specified and we've already tried them all - change to
471 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
472 outcome.setResult(PolicyResult.FAILURE_RETRIES);
473 outcome.setFinalOutcome(true);
481 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
482 * was previously invoked, and thus that the "operation" is not {@code null}.
484 * @param controller controller for all of the retries
485 * @param attempt latest attempt number, starting with 1
486 * @return a function to get the next future to execute
488 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
489 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
491 return operation -> {
492 if (!isActorFailed(operation)) {
493 // wrong type or wrong operation - just leave it as is
494 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
495 controller.complete(operation);
496 return new CompletableFuture<>();
499 if (getRetry(params.getRetry()) <= 0) {
500 // no retries - already marked as FAILURE, so just return it
501 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
502 controller.complete(operation);
503 return new CompletableFuture<>();
507 * Retry the operation.
509 long waitMs = getRetryWaitMs();
510 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
512 return sleep(waitMs, TimeUnit.MILLISECONDS)
513 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
518 * Convenience method that starts a sleep(), running via a future.
520 * @param sleepTime time to sleep
521 * @param unit time unit
522 * @return a future that will complete when the sleep completes
524 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
525 if (sleepTime <= 0) {
526 return CompletableFuture.completedFuture(null);
529 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
533 * Converts an exception into an operation outcome, returning a copy of the outcome to
534 * prevent background jobs from changing it.
536 * @param type type of item throwing the exception
537 * @return a function that will convert an exception into an operation outcome
539 private Function<Throwable, OperationOutcome> fromException(String type) {
542 OperationOutcome outcome = params.makeOutcome();
544 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
545 // do not include exception in the message, as it just clutters the log
546 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
547 params.getRequestId());
549 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
550 params.getRequestId(), thrown);
553 return setOutcome(outcome, thrown);
558 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
559 * any outstanding futures when one completes.
561 * @param futureMakers function to make a future. If the function returns
562 * {@code null}, then no future is created for that function. On the other
563 * hand, if the function throws an exception, then the previously created
564 * functions are canceled and the exception is re-thrown
565 * @return a future to cancel or await an outcome, or {@code null} if no futures were
566 * created. If this future is canceled, then all of the futures will be
569 public CompletableFuture<OperationOutcome> anyOf(
570 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
572 return anyOf(Arrays.asList(futureMakers));
576 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
577 * any outstanding futures when one completes.
579 * @param futureMakers function to make a future. If the function returns
580 * {@code null}, then no future is created for that function. On the other
581 * hand, if the function throws an exception, then the previously created
582 * functions are canceled and the exception is re-thrown
583 * @return a future to cancel or await an outcome, or {@code null} if no futures were
584 * created. If this future is canceled, then all of the futures will be
585 * canceled. Similarly, when this future completes, any incomplete futures
588 public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
590 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
592 CompletableFuture<OperationOutcome>[] futures =
593 attachFutures(controller, futureMakers, UnaryOperator.identity());
595 if (futures.length == 0) {
596 // no futures were started
600 if (futures.length == 1) {
604 CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
605 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
611 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
613 * @param futureMakers function to make a future. If the function returns
614 * {@code null}, then no future is created for that function. On the other
615 * hand, if the function throws an exception, then the previously created
616 * functions are canceled and the exception is re-thrown
617 * @return a future to cancel or await an outcome, or {@code null} if no futures were
618 * created. If this future is canceled, then all of the futures will be
621 public CompletableFuture<OperationOutcome> allOf(
622 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
624 return allOf(Arrays.asList(futureMakers));
628 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
630 * @param futureMakers function to make a future. If the function returns
631 * {@code null}, then no future is created for that function. On the other
632 * hand, if the function throws an exception, then the previously created
633 * functions are canceled and the exception is re-thrown
634 * @return a future to cancel or await an outcome, or {@code null} if no futures were
635 * created. If this future is canceled, then all of the futures will be
636 * canceled. Similarly, when this future completes, any incomplete futures
639 public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
640 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
642 Queue<OperationOutcome> outcomes = new LinkedList<>();
644 CompletableFuture<OperationOutcome>[] futures =
645 attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
646 synchronized (outcomes) {
647 outcomes.add(outcome);
652 if (futures.length == 0) {
653 // no futures were started
657 if (futures.length == 1) {
662 CompletableFuture.allOf(futures)
663 .thenApply(unused -> combineOutcomes(outcomes))
664 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
671 * Invokes the functions to create the futures and attaches them to the controller.
673 * @param controller master controller for all of the futures
674 * @param futureMakers futures to be attached to the controller
675 * @param adorn function that "adorns" the future, possible adding onto its pipeline.
676 * Returns the adorned future
677 * @return an array of futures, possibly zero-length. If the array is of size one,
678 * then that one item should be returned instead of the controller
680 private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
681 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
682 UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
684 if (futureMakers.isEmpty()) {
685 @SuppressWarnings("unchecked")
686 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
690 // the last, unadorned future that is created
691 CompletableFuture<OperationOutcome> lastFuture = null;
693 List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
696 for (var maker : futureMakers) {
698 CompletableFuture<OperationOutcome> future = maker.get();
699 if (future == null) {
703 // propagate "stop" to the future
704 controller.add(future);
706 futures.add(adorn.apply(future));
710 } catch (RuntimeException e) {
711 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
712 controller.cancel(false);
717 @SuppressWarnings("unchecked")
718 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
720 if (result.length == 1) {
721 // special case - return the unadorned future
722 result[0] = lastFuture;
726 return futures.toArray(result);
730 * Combines the outcomes from a set of tasks.
732 * @param outcomes outcomes to be examined
733 * @return the combined outcome
735 private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
737 // identify the outcome with the highest priority
738 OperationOutcome outcome = outcomes.remove();
739 int priority = detmPriority(outcome);
741 for (OperationOutcome outcome2 : outcomes) {
742 int priority2 = detmPriority(outcome2);
744 if (priority2 > priority) {
746 priority = priority2;
750 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
751 (outcome == null ? null : outcome.getResult()), params.getRequestId());
757 * Determines the priority of an outcome based on its result.
759 * @param outcome outcome to examine, or {@code null}
760 * @return the outcome's priority
762 protected int detmPriority(OperationOutcome outcome) {
763 if (outcome == null || outcome.getResult() == null) {
767 switch (outcome.getResult()) {
774 case FAILURE_RETRIES:
780 case FAILURE_TIMEOUT:
783 case FAILURE_EXCEPTION:
790 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
791 * not created until the previous task completes. The pipeline returns the outcome of
792 * the last task executed.
794 * @param futureMakers functions to make the futures
795 * @return a future to cancel the sequence or await the outcome
797 public CompletableFuture<OperationOutcome> sequence(
798 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
800 return sequence(Arrays.asList(futureMakers));
804 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
805 * not created until the previous task completes. The pipeline returns the outcome of
806 * the last task executed.
808 * @param futureMakers functions to make the futures
809 * @return a future to cancel the sequence or await the outcome, or {@code null} if
810 * there were no tasks to perform
812 public CompletableFuture<OperationOutcome> sequence(
813 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
815 Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
817 CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
818 if (nextTask == null) {
823 if (queue.isEmpty()) {
824 // only one task - just return it rather than wrapping it in a controller
829 * multiple tasks - need a controller to stop whichever task is currently
832 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
833 final Executor executor = params.getExecutor();
836 controller.wrap(nextTask)
837 .thenCompose(nextTaskOnSuccess(controller, queue))
838 .whenCompleteAsync(controller.delayedComplete(), executor);
845 * Executes the next task in the queue, if the previous outcome was successful.
847 * @param controller pipeline controller
848 * @param taskQueue queue of tasks to be performed
849 * @return a future to execute the remaining tasks, or the current outcome, if it's a
850 * failure, or if there are no more tasks
852 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
853 PipelineControllerFuture<OperationOutcome> controller,
854 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
857 if (!isSuccess(outcome)) {
858 // return the failure
859 return CompletableFuture.completedFuture(outcome);
862 CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
863 if (nextTask == null) {
864 // no tasks - just return the success
865 return CompletableFuture.completedFuture(outcome);
871 .thenCompose(nextTaskOnSuccess(controller, taskQueue));
877 * Gets the next task from the queue, skipping those that are {@code null}.
879 * @param taskQueue task queue
880 * @return the next task, or {@code null} if the queue is now empty
882 private CompletableFuture<OperationOutcome> getNextTask(
883 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
885 Supplier<CompletableFuture<OperationOutcome>> maker;
887 while ((maker = taskQueue.poll()) != null) {
888 CompletableFuture<OperationOutcome> future = maker.get();
889 if (future != null) {
898 * Sets the start time of the operation and invokes the callback to indicate that the
899 * operation has started. Does nothing if the pipeline has been stopped.
901 * This assumes that the "outcome" is not {@code null}.
903 * @param callbacks used to determine if the start callback can be invoked
904 * @return a function that sets the start time and invokes the callback
906 protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
908 return (outcome, thrown) -> {
910 if (callbacks.canStart()) {
911 outcome.setSubRequestId(getSubRequestId());
912 outcome.setStart(callbacks.getStartTime());
913 outcome.setEnd(null);
915 // pass a copy to the callback
916 OperationOutcome outcome2 = new OperationOutcome(outcome);
917 outcome2.setFinalOutcome(false);
918 params.callbackStarted(outcome2);
924 * Sets the end time of the operation and invokes the callback to indicate that the
925 * operation has completed. Does nothing if the pipeline has been stopped.
927 * This assumes that the "outcome" is not {@code null}.
929 * Note: the start time must be a reference rather than a plain value, because it's
930 * value must be gotten on-demand, when the returned function is executed at a later
933 * @param callbacks used to determine if the end callback can be invoked
934 * @return a function that sets the end time and invokes the callback
936 protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
938 return (outcome, thrown) -> {
939 if (callbacks.canEnd()) {
940 outcome.setSubRequestId(getSubRequestId());
941 outcome.setStart(callbacks.getStartTime());
942 outcome.setEnd(callbacks.getEndTime());
944 // pass a copy to the callback
945 params.callbackCompleted(new OperationOutcome(outcome));
951 * Sets an operation's outcome and message, based on a throwable.
953 * @param operation operation to be updated
954 * @return the updated operation
956 public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
957 PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
958 return setOutcome(operation, result);
962 * Sets an operation's outcome and default message based on the result.
964 * @param operation operation to be updated
965 * @param result result of the operation
966 * @return the updated operation
968 public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
969 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
970 operation.setResult(result);
971 operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
972 : ControlLoopOperation.FAILED_MSG);
978 * Determines if a throwable is due to a timeout.
980 * @param thrown throwable of interest
981 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
983 protected boolean isTimeout(Throwable thrown) {
984 if (thrown instanceof CompletionException) {
985 thrown = thrown.getCause();
988 return (thrown instanceof TimeoutException);
992 * Logs a message. If the message is not of type, String, then it attempts to
993 * pretty-print it into JSON before logging.
995 * @param direction IN or OUT
996 * @param infra communication infrastructure on which it was published
997 * @param source source name (e.g., the URL or Topic name)
998 * @param message message to be logged
999 * @return the JSON text that was logged
1001 public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
1004 json = prettyPrint(message);
1006 } catch (IllegalArgumentException e) {
1007 String type = (direction == EventType.IN ? "response" : "request");
1008 logger.warn("cannot pretty-print {}", type, e);
1009 json = message.toString();
1012 logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1018 * Converts a message to a "pretty-printed" String using the operation's normal
1019 * serialization provider (i.e., it's <i>coder</i>).
1021 * @param message response to be logged
1022 * @return the JSON text that was logged
1023 * @throws IllegalArgumentException if the message cannot be converted
1025 public <T> String prettyPrint(T message) {
1026 if (message == null) {
1028 } else if (message instanceof String) {
1029 return message.toString();
1032 return makeCoder().encode(message, true);
1033 } catch (CoderException e) {
1034 throw new IllegalArgumentException("cannot encode message", e);
1039 // these may be overridden by subclasses or junit tests
1042 * Gets the retry count.
1044 * @param retry retry, extracted from the parameters, or {@code null}
1045 * @return the number of retries, or {@code 0} if no retries were specified
1047 protected int getRetry(Integer retry) {
1048 return (retry == null ? 0 : retry);
1052 * Gets the retry wait, in milliseconds.
1054 * @return the retry wait, in milliseconds
1056 protected long getRetryWaitMs() {
1057 return DEFAULT_RETRY_WAIT_MS;
1061 * Gets the operation timeout.
1063 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1065 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1068 protected long getTimeoutMs(Integer timeoutSec) {
1069 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1072 // these may be overridden by junit tests
1074 protected Coder makeCoder() {