2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedHashMap;
28 import java.util.LinkedList;
29 import java.util.List;
31 import java.util.Queue;
32 import java.util.UUID;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import java.util.function.BiConsumer;
40 import java.util.function.Function;
41 import java.util.function.Supplier;
42 import java.util.function.UnaryOperator;
43 import lombok.AccessLevel;
46 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
48 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
49 import org.onap.policy.common.utils.coder.Coder;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.controlloop.ControlLoopOperation;
53 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
54 import org.onap.policy.controlloop.actorserviceprovider.Operation;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
56 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
58 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
59 import org.onap.policy.controlloop.policy.PolicyResult;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
64 * Partial implementation of an operator. In general, it's preferable that subclasses
65 * would override {@link #startOperationAsync(int, OperationOutcome)
66 * startOperationAsync()}. However, if that proves to be too difficult, then they can
67 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
68 * if the operation requires any preprocessor steps, the subclass may choose to override
69 * {@link #startPreprocessorAsync()}.
71 * The futures returned by the methods within this class can be canceled, and will
72 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
73 * returned by overridden methods will do the same. Of course, if a class overrides
74 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
75 * be done to cancel that particular operation.
77 * In general tasks in a pipeline are executed by the same thread. However, the following
78 * should always be executed via the executor specified in "params":
80 * <li>start callback</li>
81 * <li>completion callback</li>
82 * <li>controller completion (i.e., delayedComplete())</li>
85 public abstract class OperationPartial implements Operation {
86 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
87 private static final Coder coder = new StandardCoder();
89 public static final String GUARD_ACTOR_NAME = "GUARD";
90 public static final String GUARD_OPERATION_NAME = "Decision";
91 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
93 private final OperatorConfig config;
96 * Operation parameters.
98 protected final ControlLoopOperationParams params;
101 private final String fullName;
104 @Setter(AccessLevel.PROTECTED)
105 private String subRequestId;
108 private final List<String> propertyNames;
111 * Values for the properties identified by {@link #getPropertyNames()}.
113 private final Map<String, Object> properties = new HashMap<>();
117 * Constructs the object.
119 * @param params operation parameters
120 * @param config configuration for this operation
121 * @param propertyNames names of properties required by this operation
123 public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
124 this.params = params;
125 this.config = config;
126 this.fullName = params.getActor() + "." + params.getOperation();
127 this.propertyNames = propertyNames;
130 public Executor getBlockingExecutor() {
131 return config.getBlockingExecutor();
134 public String getActorName() {
135 return params.getActor();
138 public String getName() {
139 return params.getOperation();
143 * Determines if a property has been assigned for the operation.
145 * @param name property name
146 * @return {@code true} if the given property has been assigned for the operation,
147 * {@code false} otherwise
149 public boolean containsProperty(String name) {
150 return properties.containsKey(name);
156 * @param name property name
157 * @param value new value
159 public void setProperty(String name, Object value) {
160 properties.put(name, value);
164 * Gets a property's value.
166 * @param name name of the property of interest
167 * @return the property's value, or {@code null} if it has no value
169 @SuppressWarnings("unchecked")
170 public <T> T getProperty(String name) {
171 return (T) properties.get(name);
175 public CompletableFuture<OperationOutcome> start() {
176 // allocate a controller for the entire operation
177 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
179 CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
180 if (preproc == null) {
181 // no preprocessor required - just start the operation
182 return startOperationAttempt(controller, 1);
186 * Do preprocessor first and then, if successful, start the operation. Note:
187 * operations create their own outcome, ignoring the outcome from any previous
190 * Wrap the preprocessor to ensure "stop" is propagated to it.
193 controller.wrap(preproc)
194 .exceptionally(fromException("preprocessor of operation"))
195 .thenCompose(handlePreprocessorFailure(controller))
196 .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
197 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
204 * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
205 * invokes the call-backs, marks the controller complete, and returns an incomplete
206 * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
209 * Assumes that no callbacks have been invoked yet.
211 * @param controller pipeline controller
212 * @return a function that checks the outcome status and continues, if successful, or
213 * indicates a failure otherwise
215 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
216 PipelineControllerFuture<OperationOutcome> controller) {
220 if (isSuccess(outcome)) {
221 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
222 return CompletableFuture.completedFuture(outcome);
225 logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
227 final Executor executor = params.getExecutor();
228 final CallbackManager callbacks = new CallbackManager();
230 // propagate "stop" to the callbacks
231 controller.add(callbacks);
233 final OperationOutcome outcome2 = params.makeOutcome();
235 // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
237 outcome2.setFinalOutcome(true);
238 outcome2.setResult(PolicyResult.FAILURE_GUARD);
239 outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
242 CompletableFuture.completedFuture(outcome2)
243 .whenCompleteAsync(callbackStarted(callbacks), executor)
244 .whenCompleteAsync(callbackCompleted(callbacks), executor)
245 .whenCompleteAsync(controller.delayedComplete(), executor);
248 return new CompletableFuture<>();
253 * Invokes the operation's preprocessor step(s) as a "future". This method simply
254 * returns {@code null}.
256 * This method assumes the following:
258 * <li>the operator is alive</li>
259 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
262 * @return a function that will start the preprocessor and returns its outcome, or
263 * {@code null} if this operation needs no preprocessor
265 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
270 * Invokes the operation's guard step(s) as a "future".
272 * This method assumes the following:
274 * <li>the operator is alive</li>
275 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
278 * @return a function that will start the guard checks and returns its outcome, or
279 * {@code null} if this operation has no guard
281 protected CompletableFuture<OperationOutcome> startGuardAsync() {
282 if (params.isPreprocessed()) {
286 // get the guard payload
287 Map<String, Object> payload = makeGuardPayload();
290 * Note: can't use constants from actor.guard, because that would create a
291 * circular dependency.
293 return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
294 .payload(payload).build().start();
298 * Creates a payload to execute a guard operation.
300 * @return a new guard payload
302 protected Map<String, Object> makeGuardPayload() {
303 // TODO delete this once preprocessing is done by the application
304 Map<String, Object> guard = new LinkedHashMap<>();
305 guard.put("actor", params.getActor());
306 guard.put("operation", params.getOperation());
307 guard.put("target", params.getTargetEntity());
308 guard.put("requestId", params.getRequestId());
310 String clname = params.getContext().getEvent().getClosedLoopControlName();
311 if (clname != null) {
312 guard.put("clname", clname);
319 * Starts the operation attempt, with no preprocessor. When all retries complete, it
320 * will complete the controller.
322 * @param controller controller for all operation attempts
323 * @param attempt attempt number, typically starting with 1
324 * @return a future that will return the final result of all attempts
326 private CompletableFuture<OperationOutcome> startOperationAttempt(
327 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
329 generateSubRequestId(attempt);
331 // propagate "stop" to the operation attempt
332 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
333 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
339 * Generates and sets {@link #subRequestId} to a new subrequest ID.
341 * @param attempt attempt number, typically starting with 1
343 public void generateSubRequestId(int attempt) {
344 // Note: this should be "protected", but that makes junits much messier
346 setSubRequestId(UUID.randomUUID().toString());
350 * Starts the operation attempt, without doing any retries.
352 * @param params operation parameters
353 * @param attempt attempt number, typically starting with 1
354 * @return a future that will return the result of a single operation attempt
356 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
358 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
360 final Executor executor = params.getExecutor();
361 final OperationOutcome outcome = params.makeOutcome();
362 final CallbackManager callbacks = new CallbackManager();
364 // this operation attempt gets its own controller
365 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
367 // propagate "stop" to the callbacks
368 controller.add(callbacks);
371 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
372 .whenCompleteAsync(callbackStarted(callbacks), executor)
373 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
376 // handle timeouts, if specified
377 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
378 if (timeoutMillis > 0) {
379 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
380 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
384 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
385 * before callbackCompleted() is invoked.
387 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
388 * the pipeline as the last step anyway.
392 future.exceptionally(fromException("operation"))
393 .thenApply(setRetryFlag(attempt))
394 .whenCompleteAsync(callbackStarted(callbacks), executor)
395 .whenCompleteAsync(callbackCompleted(callbacks), executor)
396 .whenCompleteAsync(controller.delayedComplete(), executor);
403 * Determines if the outcome was successful.
405 * @param outcome outcome to examine
406 * @return {@code true} if the outcome was successful
408 protected boolean isSuccess(OperationOutcome outcome) {
409 return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
413 * Determines if the outcome was a failure for this operator.
415 * @param outcome outcome to examine, or {@code null}
416 * @return {@code true} if the outcome is not {@code null} and was a failure
417 * <i>and</i> was associated with this operator, {@code false} otherwise
419 protected boolean isActorFailed(OperationOutcome outcome) {
420 return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
424 * Determines if the given outcome is for this operation.
426 * @param outcome outcome to examine
427 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
429 protected boolean isSameOperation(OperationOutcome outcome) {
430 return OperationOutcome.isFor(outcome, getActorName(), getName());
434 * Invokes the operation as a "future". This method simply invokes
435 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
436 * returning the result via a "future".
438 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
439 * the executor in the "params", as that may bring the background thread pool to a
440 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
443 * This method assumes the following:
445 * <li>the operator is alive</li>
446 * <li>verifyRunning() has been invoked</li>
447 * <li>callbackStarted() has been invoked</li>
448 * <li>the invoker will perform appropriate timeout checks</li>
449 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
452 * @param attempt attempt number, typically starting with 1
453 * @return a function that will start the operation and return its result when
456 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
458 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
462 * Low-level method that performs the operation. This can make the same assumptions
463 * that are made by {@link #doOperationAsFuture()}. This particular method simply
464 * throws an {@link UnsupportedOperationException}.
466 * @param attempt attempt number, typically starting with 1
467 * @param operation the operation being performed
468 * @return the outcome of the operation
470 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
472 throw new UnsupportedOperationException("start operation " + getFullName());
476 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
477 * FAILURE, assuming the policy specifies retries and the retry count has been
480 * @param attempt latest attempt number, starting with 1
481 * @return a function to get the next future to execute
483 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
485 return origOutcome -> {
486 // ensure we have a non-null outcome
487 OperationOutcome outcome;
488 if (origOutcome != null) {
489 outcome = origOutcome;
491 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
492 outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
495 // ensure correct actor/operation
496 outcome.setActor(getActorName());
497 outcome.setOperation(getName());
499 // determine if we should retry, based on the result
500 if (outcome.getResult() != PolicyResult.FAILURE) {
501 // do not retry success or other failure types (e.g., exception)
502 outcome.setFinalOutcome(true);
506 int retry = getRetry(params.getRetry());
508 // no retries were specified
509 outcome.setFinalOutcome(true);
511 } else if (attempt <= retry) {
512 // have more retries - not the final outcome
513 outcome.setFinalOutcome(false);
517 * retries were specified and we've already tried them all - change to
520 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
521 outcome.setResult(PolicyResult.FAILURE_RETRIES);
522 outcome.setFinalOutcome(true);
530 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
531 * was previously invoked, and thus that the "operation" is not {@code null}.
533 * @param controller controller for all of the retries
534 * @param attempt latest attempt number, starting with 1
535 * @return a function to get the next future to execute
537 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
538 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
540 return operation -> {
541 if (!isActorFailed(operation)) {
542 // wrong type or wrong operation - just leave it as is
543 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
544 controller.complete(operation);
545 return new CompletableFuture<>();
548 if (getRetry(params.getRetry()) <= 0) {
549 // no retries - already marked as FAILURE, so just return it
550 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
551 controller.complete(operation);
552 return new CompletableFuture<>();
556 * Retry the operation.
558 long waitMs = getRetryWaitMs();
559 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
561 return sleep(waitMs, TimeUnit.MILLISECONDS)
562 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
567 * Convenience method that starts a sleep(), running via a future.
569 * @param sleepTime time to sleep
570 * @param unit time unit
571 * @return a future that will complete when the sleep completes
573 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
574 if (sleepTime <= 0) {
575 return CompletableFuture.completedFuture(null);
578 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
582 * Converts an exception into an operation outcome, returning a copy of the outcome to
583 * prevent background jobs from changing it.
585 * @param type type of item throwing the exception
586 * @return a function that will convert an exception into an operation outcome
588 private Function<Throwable, OperationOutcome> fromException(String type) {
591 OperationOutcome outcome = params.makeOutcome();
593 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
594 // do not include exception in the message, as it just clutters the log
595 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
596 params.getRequestId());
598 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
599 params.getRequestId(), thrown);
602 return setOutcome(outcome, thrown);
607 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
608 * any outstanding futures when one completes.
610 * @param futureMakers function to make a future. If the function returns
611 * {@code null}, then no future is created for that function. On the other
612 * hand, if the function throws an exception, then the previously created
613 * functions are canceled and the exception is re-thrown
614 * @return a future to cancel or await an outcome, or {@code null} if no futures were
615 * created. If this future is canceled, then all of the futures will be
618 public CompletableFuture<OperationOutcome> anyOf(
619 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
621 return anyOf(Arrays.asList(futureMakers));
625 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
626 * any outstanding futures when one completes.
628 * @param futureMakers function to make a future. If the function returns
629 * {@code null}, then no future is created for that function. On the other
630 * hand, if the function throws an exception, then the previously created
631 * functions are canceled and the exception is re-thrown
632 * @return a future to cancel or await an outcome, or {@code null} if no futures were
633 * created. If this future is canceled, then all of the futures will be
634 * canceled. Similarly, when this future completes, any incomplete futures
637 public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
639 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
641 CompletableFuture<OperationOutcome>[] futures =
642 attachFutures(controller, futureMakers, UnaryOperator.identity());
644 if (futures.length == 0) {
645 // no futures were started
649 if (futures.length == 1) {
653 CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
654 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
660 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
662 * @param futureMakers function to make a future. If the function returns
663 * {@code null}, then no future is created for that function. On the other
664 * hand, if the function throws an exception, then the previously created
665 * functions are canceled and the exception is re-thrown
666 * @return a future to cancel or await an outcome, or {@code null} if no futures were
667 * created. If this future is canceled, then all of the futures will be
670 public CompletableFuture<OperationOutcome> allOf(
671 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
673 return allOf(Arrays.asList(futureMakers));
677 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
679 * @param futureMakers function to make a future. If the function returns
680 * {@code null}, then no future is created for that function. On the other
681 * hand, if the function throws an exception, then the previously created
682 * functions are canceled and the exception is re-thrown
683 * @return a future to cancel or await an outcome, or {@code null} if no futures were
684 * created. If this future is canceled, then all of the futures will be
685 * canceled. Similarly, when this future completes, any incomplete futures
688 public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
689 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
691 Queue<OperationOutcome> outcomes = new LinkedList<>();
693 CompletableFuture<OperationOutcome>[] futures =
694 attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
695 synchronized (outcomes) {
696 outcomes.add(outcome);
701 if (futures.length == 0) {
702 // no futures were started
706 if (futures.length == 1) {
711 CompletableFuture.allOf(futures)
712 .thenApply(unused -> combineOutcomes(outcomes))
713 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
720 * Invokes the functions to create the futures and attaches them to the controller.
722 * @param controller master controller for all of the futures
723 * @param futureMakers futures to be attached to the controller
724 * @param adorn function that "adorns" the future, possible adding onto its pipeline.
725 * Returns the adorned future
726 * @return an array of futures, possibly zero-length. If the array is of size one,
727 * then that one item should be returned instead of the controller
729 private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
730 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
731 UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
733 if (futureMakers.isEmpty()) {
734 @SuppressWarnings("unchecked")
735 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
739 // the last, unadorned future that is created
740 CompletableFuture<OperationOutcome> lastFuture = null;
742 List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
745 for (var maker : futureMakers) {
747 CompletableFuture<OperationOutcome> future = maker.get();
748 if (future == null) {
752 // propagate "stop" to the future
753 controller.add(future);
755 futures.add(adorn.apply(future));
759 } catch (RuntimeException e) {
760 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
761 controller.cancel(false);
766 @SuppressWarnings("unchecked")
767 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
769 if (result.length == 1) {
770 // special case - return the unadorned future
771 result[0] = lastFuture;
775 return futures.toArray(result);
779 * Combines the outcomes from a set of tasks.
781 * @param outcomes outcomes to be examined
782 * @return the combined outcome
784 private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
786 // identify the outcome with the highest priority
787 OperationOutcome outcome = outcomes.remove();
788 int priority = detmPriority(outcome);
790 for (OperationOutcome outcome2 : outcomes) {
791 int priority2 = detmPriority(outcome2);
793 if (priority2 > priority) {
795 priority = priority2;
799 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
800 (outcome == null ? null : outcome.getResult()), params.getRequestId());
806 * Determines the priority of an outcome based on its result.
808 * @param outcome outcome to examine, or {@code null}
809 * @return the outcome's priority
811 protected int detmPriority(OperationOutcome outcome) {
812 if (outcome == null || outcome.getResult() == null) {
816 switch (outcome.getResult()) {
823 case FAILURE_RETRIES:
829 case FAILURE_TIMEOUT:
832 case FAILURE_EXCEPTION:
839 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
840 * not created until the previous task completes. The pipeline returns the outcome of
841 * the last task executed.
843 * @param futureMakers functions to make the futures
844 * @return a future to cancel the sequence or await the outcome
846 public CompletableFuture<OperationOutcome> sequence(
847 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
849 return sequence(Arrays.asList(futureMakers));
853 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
854 * not created until the previous task completes. The pipeline returns the outcome of
855 * the last task executed.
857 * @param futureMakers functions to make the futures
858 * @return a future to cancel the sequence or await the outcome, or {@code null} if
859 * there were no tasks to perform
861 public CompletableFuture<OperationOutcome> sequence(
862 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
864 Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
866 CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
867 if (nextTask == null) {
872 if (queue.isEmpty()) {
873 // only one task - just return it rather than wrapping it in a controller
878 * multiple tasks - need a controller to stop whichever task is currently
881 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
882 final Executor executor = params.getExecutor();
885 controller.wrap(nextTask)
886 .thenCompose(nextTaskOnSuccess(controller, queue))
887 .whenCompleteAsync(controller.delayedComplete(), executor);
894 * Executes the next task in the queue, if the previous outcome was successful.
896 * @param controller pipeline controller
897 * @param taskQueue queue of tasks to be performed
898 * @return a future to execute the remaining tasks, or the current outcome, if it's a
899 * failure, or if there are no more tasks
901 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
902 PipelineControllerFuture<OperationOutcome> controller,
903 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
906 if (!isSuccess(outcome)) {
907 // return the failure
908 return CompletableFuture.completedFuture(outcome);
911 CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
912 if (nextTask == null) {
913 // no tasks - just return the success
914 return CompletableFuture.completedFuture(outcome);
920 .thenCompose(nextTaskOnSuccess(controller, taskQueue));
926 * Gets the next task from the queue, skipping those that are {@code null}.
928 * @param taskQueue task queue
929 * @return the next task, or {@code null} if the queue is now empty
931 private CompletableFuture<OperationOutcome> getNextTask(
932 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
934 Supplier<CompletableFuture<OperationOutcome>> maker;
936 while ((maker = taskQueue.poll()) != null) {
937 CompletableFuture<OperationOutcome> future = maker.get();
938 if (future != null) {
947 * Sets the start time of the operation and invokes the callback to indicate that the
948 * operation has started. Does nothing if the pipeline has been stopped.
950 * This assumes that the "outcome" is not {@code null}.
952 * @param callbacks used to determine if the start callback can be invoked
953 * @return a function that sets the start time and invokes the callback
955 protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
957 return (outcome, thrown) -> {
959 if (callbacks.canStart()) {
960 outcome.setSubRequestId(getSubRequestId());
961 outcome.setStart(callbacks.getStartTime());
962 outcome.setEnd(null);
964 // pass a copy to the callback
965 OperationOutcome outcome2 = new OperationOutcome(outcome);
966 outcome2.setFinalOutcome(false);
967 params.callbackStarted(outcome2);
973 * Sets the end time of the operation and invokes the callback to indicate that the
974 * operation has completed. Does nothing if the pipeline has been stopped.
976 * This assumes that the "outcome" is not {@code null}.
978 * Note: the start time must be a reference rather than a plain value, because it's
979 * value must be gotten on-demand, when the returned function is executed at a later
982 * @param callbacks used to determine if the end callback can be invoked
983 * @return a function that sets the end time and invokes the callback
985 protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
987 return (outcome, thrown) -> {
988 if (callbacks.canEnd()) {
989 outcome.setSubRequestId(getSubRequestId());
990 outcome.setStart(callbacks.getStartTime());
991 outcome.setEnd(callbacks.getEndTime());
993 // pass a copy to the callback
994 params.callbackCompleted(new OperationOutcome(outcome));
1000 * Sets an operation's outcome and message, based on a throwable.
1002 * @param operation operation to be updated
1003 * @return the updated operation
1005 public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
1006 PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
1007 return setOutcome(operation, result);
1011 * Sets an operation's outcome and default message based on the result.
1013 * @param operation operation to be updated
1014 * @param result result of the operation
1015 * @return the updated operation
1017 public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
1018 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
1019 operation.setResult(result);
1020 operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
1021 : ControlLoopOperation.FAILED_MSG);
1027 * Determines if a throwable is due to a timeout.
1029 * @param thrown throwable of interest
1030 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
1032 protected boolean isTimeout(Throwable thrown) {
1033 if (thrown instanceof CompletionException) {
1034 thrown = thrown.getCause();
1037 return (thrown instanceof TimeoutException);
1041 * Logs a message. If the message is not of type, String, then it attempts to
1042 * pretty-print it into JSON before logging.
1044 * @param direction IN or OUT
1045 * @param infra communication infrastructure on which it was published
1046 * @param source source name (e.g., the URL or Topic name)
1047 * @param message message to be logged
1048 * @return the JSON text that was logged
1050 public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
1053 json = prettyPrint(message);
1055 } catch (IllegalArgumentException e) {
1056 String type = (direction == EventType.IN ? "response" : "request");
1057 logger.warn("cannot pretty-print {}", type, e);
1058 json = message.toString();
1061 logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1067 * Converts a message to a "pretty-printed" String using the operation's normal
1068 * serialization provider (i.e., it's <i>coder</i>).
1070 * @param message response to be logged
1071 * @return the JSON text that was logged
1072 * @throws IllegalArgumentException if the message cannot be converted
1074 public <T> String prettyPrint(T message) {
1075 if (message == null) {
1077 } else if (message instanceof String) {
1078 return message.toString();
1081 return getCoder().encode(message, true);
1082 } catch (CoderException e) {
1083 throw new IllegalArgumentException("cannot encode message", e);
1088 // these may be overridden by subclasses or junit tests
1091 * Gets the retry count.
1093 * @param retry retry, extracted from the parameters, or {@code null}
1094 * @return the number of retries, or {@code 0} if no retries were specified
1096 protected int getRetry(Integer retry) {
1097 return (retry == null ? 0 : retry);
1101 * Gets the retry wait, in milliseconds.
1103 * @return the retry wait, in milliseconds
1105 protected long getRetryWaitMs() {
1106 return DEFAULT_RETRY_WAIT_MS;
1110 * Gets the operation timeout.
1112 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1114 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1117 protected long getTimeoutMs(Integer timeoutSec) {
1118 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1121 // these may be overridden by junit tests
1123 protected Coder getCoder() {