2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedHashMap;
28 import java.util.LinkedList;
29 import java.util.List;
31 import java.util.Queue;
32 import java.util.UUID;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import java.util.function.BiConsumer;
40 import java.util.function.Function;
41 import java.util.function.Supplier;
42 import java.util.function.UnaryOperator;
43 import lombok.AccessLevel;
46 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
48 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
49 import org.onap.policy.common.utils.coder.Coder;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.controlloop.ControlLoopOperation;
53 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
54 import org.onap.policy.controlloop.actorserviceprovider.Operation;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
57 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
59 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
60 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
65 * Partial implementation of an operator. In general, it's preferable that subclasses
66 * would override {@link #startOperationAsync(int, OperationOutcome)
67 * startOperationAsync()}. However, if that proves to be too difficult, then they can
68 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
69 * if the operation requires any preprocessor steps, the subclass may choose to override
70 * {@link #startPreprocessorAsync()}.
72 * The futures returned by the methods within this class can be canceled, and will
73 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
74 * returned by overridden methods will do the same. Of course, if a class overrides
75 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
76 * be done to cancel that particular operation.
78 * In general tasks in a pipeline are executed by the same thread. However, the following
79 * should always be executed via the executor specified in "params":
81 * <li>start callback</li>
82 * <li>completion callback</li>
83 * <li>controller completion (i.e., delayedComplete())</li>
86 public abstract class OperationPartial implements Operation {
87 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
88 private static final Coder coder = new StandardCoder();
90 public static final String GUARD_ACTOR_NAME = "GUARD";
91 public static final String GUARD_OPERATION_NAME = "Decision";
92 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
94 private final OperatorConfig config;
97 * Operation parameters.
99 protected final ControlLoopOperationParams params;
102 private final String fullName;
105 @Setter(AccessLevel.PROTECTED)
106 private String subRequestId;
109 private final List<String> propertyNames;
112 * Values for the properties identified by {@link #getPropertyNames()}.
114 private final Map<String, Object> properties = new HashMap<>();
118 * Constructs the object.
120 * @param params operation parameters
121 * @param config configuration for this operation
122 * @param propertyNames names of properties required by this operation
124 public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
125 this.params = params;
126 this.config = config;
127 this.fullName = params.getActor() + "." + params.getOperation();
128 this.propertyNames = propertyNames;
131 public Executor getBlockingExecutor() {
132 return config.getBlockingExecutor();
136 public String getActorName() {
137 return params.getActor();
141 public String getName() {
142 return params.getOperation();
146 public boolean containsProperty(String name) {
147 return properties.containsKey(name);
151 public void setProperty(String name, Object value) {
152 logger.info("{}: set property {}={}", getFullName(), name, value);
153 properties.put(name, value);
156 @SuppressWarnings("unchecked")
158 public <T> T getProperty(String name) {
159 return (T) properties.get(name);
163 public CompletableFuture<OperationOutcome> start() {
164 // allocate a controller for the entire operation
165 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
167 CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
168 if (preproc == null) {
169 // no preprocessor required - just start the operation
170 return startOperationAttempt(controller, 1);
174 * Do preprocessor first and then, if successful, start the operation. Note:
175 * operations create their own outcome, ignoring the outcome from any previous
178 * Wrap the preprocessor to ensure "stop" is propagated to it.
181 controller.wrap(preproc)
182 .exceptionally(fromException("preprocessor of operation"))
183 .thenCompose(handlePreprocessorFailure(controller))
184 .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
185 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
192 * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
193 * invokes the call-backs, marks the controller complete, and returns an incomplete
194 * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
197 * Assumes that no callbacks have been invoked yet.
199 * @param controller pipeline controller
200 * @return a function that checks the outcome status and continues, if successful, or
201 * indicates a failure otherwise
203 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
204 PipelineControllerFuture<OperationOutcome> controller) {
208 if (isSuccess(outcome)) {
209 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
210 return CompletableFuture.completedFuture(outcome);
213 logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
215 final Executor executor = params.getExecutor();
216 final CallbackManager callbacks = new CallbackManager();
218 // propagate "stop" to the callbacks
219 controller.add(callbacks);
221 final OperationOutcome outcome2 = params.makeOutcome(getTargetEntity());
223 // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
225 outcome2.setFinalOutcome(true);
226 outcome2.setResult(OperationResult.FAILURE_GUARD);
227 outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
230 CompletableFuture.completedFuture(outcome2)
231 .whenCompleteAsync(callbackStarted(callbacks), executor)
232 .whenCompleteAsync(callbackCompleted(callbacks), executor)
233 .whenCompleteAsync(controller.delayedComplete(), executor);
236 return new CompletableFuture<>();
241 * Invokes the operation's preprocessor step(s) as a "future". This method simply
242 * returns {@code null}.
244 * This method assumes the following:
246 * <li>the operator is alive</li>
247 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
250 * @return a function that will start the preprocessor and returns its outcome, or
251 * {@code null} if this operation needs no preprocessor
253 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
258 * Invokes the operation's guard step(s) as a "future".
260 * This method assumes the following:
262 * <li>the operator is alive</li>
263 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
266 * @return a function that will start the guard checks and returns its outcome, or
267 * {@code null} if this operation has no guard
269 protected CompletableFuture<OperationOutcome> startGuardAsync() {
270 if (params.isPreprocessed()) {
274 // get the guard payload
275 Map<String, Object> payload = makeGuardPayload();
278 * Note: can't use constants from actor.guard, because that would create a
279 * circular dependency.
281 return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
282 .payload(payload).build().start();
286 * Creates a payload to execute a guard operation.
288 * @return a new guard payload
290 protected Map<String, Object> makeGuardPayload() {
291 // TODO delete this once preprocessing is done by the application
292 Map<String, Object> guard = new LinkedHashMap<>();
293 guard.put("actor", params.getActor());
294 guard.put("operation", params.getOperation());
295 guard.put("target", getTargetEntity());
296 guard.put("requestId", params.getRequestId());
298 String clname = params.getContext().getEvent().getClosedLoopControlName();
299 if (clname != null) {
300 guard.put("clname", clname);
307 * Starts the operation attempt, with no preprocessor. When all retries complete, it
308 * will complete the controller.
310 * @param controller controller for all operation attempts
311 * @param attempt attempt number, typically starting with 1
312 * @return a future that will return the final result of all attempts
314 private CompletableFuture<OperationOutcome> startOperationAttempt(
315 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
317 generateSubRequestId(attempt);
319 // propagate "stop" to the operation attempt
320 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
321 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
327 * Generates and sets {@link #subRequestId} to a new subrequest ID.
329 * @param attempt attempt number, typically starting with 1
331 public void generateSubRequestId(int attempt) {
332 // Note: this should be "protected", but that makes junits much messier
334 setSubRequestId(UUID.randomUUID().toString());
338 * Starts the operation attempt, without doing any retries.
340 * @param params operation parameters
341 * @param attempt attempt number, typically starting with 1
342 * @return a future that will return the result of a single operation attempt
344 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
346 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
348 final Executor executor = params.getExecutor();
349 final OperationOutcome outcome = params.makeOutcome(getTargetEntity());
350 final CallbackManager callbacks = new CallbackManager();
352 // this operation attempt gets its own controller
353 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
355 // propagate "stop" to the callbacks
356 controller.add(callbacks);
359 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
360 .whenCompleteAsync(callbackStarted(callbacks), executor)
361 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
364 // handle timeouts, if specified
365 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
366 if (timeoutMillis > 0) {
367 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
368 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
372 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
373 * before callbackCompleted() is invoked.
375 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
376 * the pipeline as the last step anyway.
380 future.exceptionally(fromException("operation"))
381 .thenApply(setRetryFlag(attempt))
382 .whenCompleteAsync(callbackStarted(callbacks), executor)
383 .whenCompleteAsync(callbackCompleted(callbacks), executor)
384 .whenCompleteAsync(controller.delayedComplete(), executor);
391 * Determines if the outcome was successful.
393 * @param outcome outcome to examine
394 * @return {@code true} if the outcome was successful
396 protected boolean isSuccess(OperationOutcome outcome) {
397 return (outcome != null && outcome.getResult() == OperationResult.SUCCESS);
401 * Determines if the outcome was a failure for this operator.
403 * @param outcome outcome to examine, or {@code null}
404 * @return {@code true} if the outcome is not {@code null} and was a failure
405 * <i>and</i> was associated with this operator, {@code false} otherwise
407 protected boolean isActorFailed(OperationOutcome outcome) {
408 return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE);
412 * Determines if the given outcome is for this operation.
414 * @param outcome outcome to examine
415 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
417 protected boolean isSameOperation(OperationOutcome outcome) {
418 return OperationOutcome.isFor(outcome, getActorName(), getName());
422 * Invokes the operation as a "future". This method simply invokes
423 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
424 * returning the result via a "future".
426 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
427 * the executor in the "params", as that may bring the background thread pool to a
428 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
431 * This method assumes the following:
433 * <li>the operator is alive</li>
434 * <li>verifyRunning() has been invoked</li>
435 * <li>callbackStarted() has been invoked</li>
436 * <li>the invoker will perform appropriate timeout checks</li>
437 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
440 * @param attempt attempt number, typically starting with 1
441 * @return a function that will start the operation and return its result when
444 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
446 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
450 * Low-level method that performs the operation. This can make the same assumptions
451 * that are made by {@link #doOperationAsFuture()}. This particular method simply
452 * throws an {@link UnsupportedOperationException}.
454 * @param attempt attempt number, typically starting with 1
455 * @param operation the operation being performed
456 * @return the outcome of the operation
458 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
460 throw new UnsupportedOperationException("start operation " + getFullName());
464 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
465 * FAILURE, assuming the policy specifies retries and the retry count has been
468 * @param attempt latest attempt number, starting with 1
469 * @return a function to get the next future to execute
471 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
473 return origOutcome -> {
474 // ensure we have a non-null outcome
475 OperationOutcome outcome;
476 if (origOutcome != null) {
477 outcome = origOutcome;
479 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
480 outcome = this.setOutcome(params.makeOutcome(getTargetEntity()), OperationResult.FAILURE);
483 // ensure correct actor/operation
484 outcome.setActor(getActorName());
485 outcome.setOperation(getName());
487 // determine if we should retry, based on the result
488 if (outcome.getResult() != OperationResult.FAILURE) {
489 // do not retry success or other failure types (e.g., exception)
490 outcome.setFinalOutcome(true);
494 int retry = getRetry(params.getRetry());
496 // no retries were specified
497 outcome.setFinalOutcome(true);
499 } else if (attempt <= retry) {
500 // have more retries - not the final outcome
501 outcome.setFinalOutcome(false);
505 * retries were specified and we've already tried them all - change to
508 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
509 outcome.setResult(OperationResult.FAILURE_RETRIES);
510 outcome.setFinalOutcome(true);
518 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
519 * was previously invoked, and thus that the "operation" is not {@code null}.
521 * @param controller controller for all of the retries
522 * @param attempt latest attempt number, starting with 1
523 * @return a function to get the next future to execute
525 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
526 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
528 return operation -> {
529 if (!isActorFailed(operation)) {
530 // wrong type or wrong operation - just leave it as is
531 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
532 controller.complete(operation);
533 return new CompletableFuture<>();
536 if (getRetry(params.getRetry()) <= 0) {
537 // no retries - already marked as FAILURE, so just return it
538 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
539 controller.complete(operation);
540 return new CompletableFuture<>();
544 * Retry the operation.
546 long waitMs = getRetryWaitMs();
547 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
549 return sleep(waitMs, TimeUnit.MILLISECONDS)
550 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
555 * Convenience method that starts a sleep(), running via a future.
557 * @param sleepTime time to sleep
558 * @param unit time unit
559 * @return a future that will complete when the sleep completes
561 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
562 if (sleepTime <= 0) {
563 return CompletableFuture.completedFuture(null);
566 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
570 * Converts an exception into an operation outcome, returning a copy of the outcome to
571 * prevent background jobs from changing it.
573 * @param type type of item throwing the exception
574 * @return a function that will convert an exception into an operation outcome
576 private Function<Throwable, OperationOutcome> fromException(String type) {
579 OperationOutcome outcome = params.makeOutcome(getTargetEntity());
581 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
582 // do not include exception in the message, as it just clutters the log
583 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
584 params.getRequestId());
586 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
587 params.getRequestId(), thrown);
590 return setOutcome(outcome, thrown);
595 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
596 * any outstanding futures when one completes.
598 * @param futureMakers function to make a future. If the function returns
599 * {@code null}, then no future is created for that function. On the other
600 * hand, if the function throws an exception, then the previously created
601 * functions are canceled and the exception is re-thrown
602 * @return a future to cancel or await an outcome, or {@code null} if no futures were
603 * created. If this future is canceled, then all of the futures will be
606 public CompletableFuture<OperationOutcome> anyOf(
607 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
609 return anyOf(Arrays.asList(futureMakers));
613 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
614 * any outstanding futures when one completes.
616 * @param futureMakers function to make a future. If the function returns
617 * {@code null}, then no future is created for that function. On the other
618 * hand, if the function throws an exception, then the previously created
619 * functions are canceled and the exception is re-thrown
620 * @return a future to cancel or await an outcome, or {@code null} if no futures were
621 * created. If this future is canceled, then all of the futures will be
622 * canceled. Similarly, when this future completes, any incomplete futures
625 public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
627 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
629 CompletableFuture<OperationOutcome>[] futures =
630 attachFutures(controller, futureMakers, UnaryOperator.identity());
632 if (futures.length == 0) {
633 // no futures were started
637 if (futures.length == 1) {
641 CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
642 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
648 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
650 * @param futureMakers function to make a future. If the function returns
651 * {@code null}, then no future is created for that function. On the other
652 * hand, if the function throws an exception, then the previously created
653 * functions are canceled and the exception is re-thrown
654 * @return a future to cancel or await an outcome, or {@code null} if no futures were
655 * created. If this future is canceled, then all of the futures will be
658 public CompletableFuture<OperationOutcome> allOf(
659 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
661 return allOf(Arrays.asList(futureMakers));
665 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
667 * @param futureMakers function to make a future. If the function returns
668 * {@code null}, then no future is created for that function. On the other
669 * hand, if the function throws an exception, then the previously created
670 * functions are canceled and the exception is re-thrown
671 * @return a future to cancel or await an outcome, or {@code null} if no futures were
672 * created. If this future is canceled, then all of the futures will be
673 * canceled. Similarly, when this future completes, any incomplete futures
676 public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
677 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
679 Queue<OperationOutcome> outcomes = new LinkedList<>();
681 CompletableFuture<OperationOutcome>[] futures =
682 attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
683 synchronized (outcomes) {
684 outcomes.add(outcome);
689 if (futures.length == 0) {
690 // no futures were started
694 if (futures.length == 1) {
699 CompletableFuture.allOf(futures)
700 .thenApply(unused -> combineOutcomes(outcomes))
701 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
708 * Invokes the functions to create the futures and attaches them to the controller.
710 * @param controller master controller for all of the futures
711 * @param futureMakers futures to be attached to the controller
712 * @param adorn function that "adorns" the future, possible adding onto its pipeline.
713 * Returns the adorned future
714 * @return an array of futures, possibly zero-length. If the array is of size one,
715 * then that one item should be returned instead of the controller
717 private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
718 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
719 UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
721 if (futureMakers.isEmpty()) {
722 @SuppressWarnings("unchecked")
723 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
727 // the last, unadorned future that is created
728 CompletableFuture<OperationOutcome> lastFuture = null;
730 List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
733 for (var maker : futureMakers) {
735 CompletableFuture<OperationOutcome> future = maker.get();
736 if (future == null) {
740 // propagate "stop" to the future
741 controller.add(future);
743 futures.add(adorn.apply(future));
747 } catch (RuntimeException e) {
748 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
749 controller.cancel(false);
754 @SuppressWarnings("unchecked")
755 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
757 if (result.length == 1) {
758 // special case - return the unadorned future
759 result[0] = lastFuture;
763 return futures.toArray(result);
767 * Combines the outcomes from a set of tasks.
769 * @param outcomes outcomes to be examined
770 * @return the combined outcome
772 private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
774 // identify the outcome with the highest priority
775 OperationOutcome outcome = outcomes.remove();
776 int priority = detmPriority(outcome);
778 for (OperationOutcome outcome2 : outcomes) {
779 int priority2 = detmPriority(outcome2);
781 if (priority2 > priority) {
783 priority = priority2;
787 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
788 (outcome == null ? null : outcome.getResult()), params.getRequestId());
794 * Determines the priority of an outcome based on its result.
796 * @param outcome outcome to examine, or {@code null}
797 * @return the outcome's priority
799 protected int detmPriority(OperationOutcome outcome) {
800 if (outcome == null || outcome.getResult() == null) {
804 switch (outcome.getResult()) {
811 case FAILURE_RETRIES:
817 case FAILURE_TIMEOUT:
820 case FAILURE_EXCEPTION:
827 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
828 * not created until the previous task completes. The pipeline returns the outcome of
829 * the last task executed.
831 * @param futureMakers functions to make the futures
832 * @return a future to cancel the sequence or await the outcome
834 public CompletableFuture<OperationOutcome> sequence(
835 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
837 return sequence(Arrays.asList(futureMakers));
841 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
842 * not created until the previous task completes. The pipeline returns the outcome of
843 * the last task executed.
845 * @param futureMakers functions to make the futures
846 * @return a future to cancel the sequence or await the outcome, or {@code null} if
847 * there were no tasks to perform
849 public CompletableFuture<OperationOutcome> sequence(
850 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
852 Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
854 CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
855 if (nextTask == null) {
860 if (queue.isEmpty()) {
861 // only one task - just return it rather than wrapping it in a controller
866 * multiple tasks - need a controller to stop whichever task is currently
869 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
870 final Executor executor = params.getExecutor();
873 controller.wrap(nextTask)
874 .thenCompose(nextTaskOnSuccess(controller, queue))
875 .whenCompleteAsync(controller.delayedComplete(), executor);
882 * Executes the next task in the queue, if the previous outcome was successful.
884 * @param controller pipeline controller
885 * @param taskQueue queue of tasks to be performed
886 * @return a future to execute the remaining tasks, or the current outcome, if it's a
887 * failure, or if there are no more tasks
889 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
890 PipelineControllerFuture<OperationOutcome> controller,
891 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
894 if (!isSuccess(outcome)) {
895 // return the failure
896 return CompletableFuture.completedFuture(outcome);
899 CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
900 if (nextTask == null) {
901 // no tasks - just return the success
902 return CompletableFuture.completedFuture(outcome);
908 .thenCompose(nextTaskOnSuccess(controller, taskQueue));
914 * Gets the next task from the queue, skipping those that are {@code null}.
916 * @param taskQueue task queue
917 * @return the next task, or {@code null} if the queue is now empty
919 private CompletableFuture<OperationOutcome> getNextTask(
920 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
922 Supplier<CompletableFuture<OperationOutcome>> maker;
924 while ((maker = taskQueue.poll()) != null) {
925 CompletableFuture<OperationOutcome> future = maker.get();
926 if (future != null) {
935 * Sets the start time of the operation and invokes the callback to indicate that the
936 * operation has started. Does nothing if the pipeline has been stopped.
938 * This assumes that the "outcome" is not {@code null}.
940 * @param callbacks used to determine if the start callback can be invoked
941 * @return a function that sets the start time and invokes the callback
943 protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
945 return (outcome, thrown) -> {
947 if (callbacks.canStart()) {
948 outcome.setSubRequestId(getSubRequestId());
949 outcome.setStart(callbacks.getStartTime());
950 outcome.setEnd(null);
952 // pass a copy to the callback
953 OperationOutcome outcome2 = new OperationOutcome(outcome);
954 outcome2.setFinalOutcome(false);
955 params.callbackStarted(outcome2);
961 * Sets the end time of the operation and invokes the callback to indicate that the
962 * operation has completed. Does nothing if the pipeline has been stopped.
964 * This assumes that the "outcome" is not {@code null}.
966 * Note: the start time must be a reference rather than a plain value, because it's
967 * value must be gotten on-demand, when the returned function is executed at a later
970 * @param callbacks used to determine if the end callback can be invoked
971 * @return a function that sets the end time and invokes the callback
973 protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
975 return (outcome, thrown) -> {
976 if (callbacks.canEnd()) {
977 outcome.setSubRequestId(getSubRequestId());
978 outcome.setStart(callbacks.getStartTime());
979 outcome.setEnd(callbacks.getEndTime());
981 // pass a copy to the callback
982 params.callbackCompleted(new OperationOutcome(outcome));
988 * Sets an operation's outcome and message, based on a throwable.
990 * @param operation operation to be updated
991 * @return the updated operation
993 public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
994 OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT
995 : OperationResult.FAILURE_EXCEPTION);
996 return setOutcome(operation, result);
1000 * Sets an operation's outcome and default message based on the result.
1002 * @param operation operation to be updated
1003 * @param result result of the operation
1004 * @return the updated operation
1006 public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) {
1007 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
1008 operation.setResult(result);
1009 operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
1010 : ControlLoopOperation.FAILED_MSG);
1016 * Determines if a throwable is due to a timeout.
1018 * @param thrown throwable of interest
1019 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
1021 protected boolean isTimeout(Throwable thrown) {
1022 if (thrown instanceof CompletionException) {
1023 thrown = thrown.getCause();
1026 return (thrown instanceof TimeoutException);
1030 * Logs a message. If the message is not of type, String, then it attempts to
1031 * pretty-print it into JSON before logging.
1033 * @param direction IN or OUT
1034 * @param infra communication infrastructure on which it was published
1035 * @param source source name (e.g., the URL or Topic name)
1036 * @param message message to be logged
1037 * @return the JSON text that was logged
1039 public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
1042 json = prettyPrint(message);
1044 } catch (IllegalArgumentException e) {
1045 String type = (direction == EventType.IN ? "response" : "request");
1046 logger.warn("cannot pretty-print {}", type, e);
1047 json = message.toString();
1050 logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1056 * Converts a message to a "pretty-printed" String using the operation's normal
1057 * serialization provider (i.e., it's <i>coder</i>).
1059 * @param message response to be logged
1060 * @return the JSON text that was logged
1061 * @throws IllegalArgumentException if the message cannot be converted
1063 public <T> String prettyPrint(T message) {
1064 if (message == null) {
1066 } else if (message instanceof String) {
1067 return message.toString();
1070 return getCoder().encode(message, true);
1071 } catch (CoderException e) {
1072 throw new IllegalArgumentException("cannot encode message", e);
1077 // these may be overridden by subclasses or junit tests
1080 * Gets the retry count.
1082 * @param retry retry, extracted from the parameters, or {@code null}
1083 * @return the number of retries, or {@code 0} if no retries were specified
1085 protected int getRetry(Integer retry) {
1086 return (retry == null ? 0 : retry);
1090 * Gets the retry wait, in milliseconds.
1092 * @return the retry wait, in milliseconds
1094 protected long getRetryWaitMs() {
1095 return DEFAULT_RETRY_WAIT_MS;
1099 * Gets the target entity, first trying the properties and then the parameters.
1101 * @return the target entity
1103 protected String getTargetEntity() {
1104 String targetEntity = getProperty(OperationProperties.AAI_TARGET_ENTITY);
1105 return (targetEntity != null ? targetEntity : params.getTargetEntity());
1109 * Gets the operation timeout.
1111 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1113 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1116 protected long getTimeoutMs(Integer timeoutSec) {
1117 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1120 // these may be overridden by junit tests
1122 protected Coder getCoder() {