2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedHashMap;
28 import java.util.LinkedList;
29 import java.util.List;
31 import java.util.Queue;
32 import java.util.UUID;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.CompletableFuture;
35 import java.util.concurrent.CompletionException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import java.util.function.BiConsumer;
40 import java.util.function.Function;
41 import java.util.function.Supplier;
42 import java.util.function.UnaryOperator;
43 import lombok.AccessLevel;
46 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
48 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
49 import org.onap.policy.common.utils.coder.Coder;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.controlloop.ControlLoopOperation;
53 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
54 import org.onap.policy.controlloop.actorserviceprovider.Operation;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
59 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
60 import org.onap.policy.controlloop.policy.PolicyResult;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
65 * Partial implementation of an operator. In general, it's preferable that subclasses
66 * would override {@link #startOperationAsync(int, OperationOutcome)
67 * startOperationAsync()}. However, if that proves to be too difficult, then they can
68 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
69 * if the operation requires any preprocessor steps, the subclass may choose to override
70 * {@link #startPreprocessorAsync()}.
72 * The futures returned by the methods within this class can be canceled, and will
73 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
74 * returned by overridden methods will do the same. Of course, if a class overrides
75 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
76 * be done to cancel that particular operation.
78 * In general tasks in a pipeline are executed by the same thread. However, the following
79 * should always be executed via the executor specified in "params":
81 * <li>start callback</li>
82 * <li>completion callback</li>
83 * <li>controller completion (i.e., delayedComplete())</li>
86 public abstract class OperationPartial implements Operation {
87 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
88 private static final Coder coder = new StandardCoder();
90 public static final String GUARD_ACTOR_NAME = "GUARD";
91 public static final String GUARD_OPERATION_NAME = "Decision";
92 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
94 private final OperatorConfig config;
97 * Operation parameters.
99 protected final ControlLoopOperationParams params;
102 private final String fullName;
105 @Setter(AccessLevel.PROTECTED)
106 private String subRequestId;
109 private final List<String> propertyNames;
112 * Values for the properties identified by {@link #getPropertyNames()}.
114 private final Map<String, Object> properties = new HashMap<>();
118 * Constructs the object.
120 * @param params operation parameters
121 * @param config configuration for this operation
122 * @param propertyNames names of properties required by this operation
124 public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
125 this.params = params;
126 this.config = config;
127 this.fullName = params.getActor() + "." + params.getOperation();
128 this.propertyNames = propertyNames;
131 public Executor getBlockingExecutor() {
132 return config.getBlockingExecutor();
135 public String getActorName() {
136 return params.getActor();
139 public String getName() {
140 return params.getOperation();
144 public boolean containsProperty(String name) {
145 return properties.containsKey(name);
149 public void setProperty(String name, Object value) {
150 logger.info("{}: set property {}={}", getFullName(), name, value);
151 properties.put(name, value);
154 @SuppressWarnings("unchecked")
156 public <T> T getProperty(String name) {
157 return (T) properties.get(name);
161 public CompletableFuture<OperationOutcome> start() {
162 // allocate a controller for the entire operation
163 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
165 CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
166 if (preproc == null) {
167 // no preprocessor required - just start the operation
168 return startOperationAttempt(controller, 1);
172 * Do preprocessor first and then, if successful, start the operation. Note:
173 * operations create their own outcome, ignoring the outcome from any previous
176 * Wrap the preprocessor to ensure "stop" is propagated to it.
179 controller.wrap(preproc)
180 .exceptionally(fromException("preprocessor of operation"))
181 .thenCompose(handlePreprocessorFailure(controller))
182 .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
183 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
190 * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
191 * invokes the call-backs, marks the controller complete, and returns an incomplete
192 * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
195 * Assumes that no callbacks have been invoked yet.
197 * @param controller pipeline controller
198 * @return a function that checks the outcome status and continues, if successful, or
199 * indicates a failure otherwise
201 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
202 PipelineControllerFuture<OperationOutcome> controller) {
206 if (isSuccess(outcome)) {
207 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
208 return CompletableFuture.completedFuture(outcome);
211 logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
213 final Executor executor = params.getExecutor();
214 final CallbackManager callbacks = new CallbackManager();
216 // propagate "stop" to the callbacks
217 controller.add(callbacks);
219 final OperationOutcome outcome2 = params.makeOutcome(getTargetEntity());
221 // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
223 outcome2.setFinalOutcome(true);
224 outcome2.setResult(PolicyResult.FAILURE_GUARD);
225 outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
228 CompletableFuture.completedFuture(outcome2)
229 .whenCompleteAsync(callbackStarted(callbacks), executor)
230 .whenCompleteAsync(callbackCompleted(callbacks), executor)
231 .whenCompleteAsync(controller.delayedComplete(), executor);
234 return new CompletableFuture<>();
239 * Invokes the operation's preprocessor step(s) as a "future". This method simply
240 * returns {@code null}.
242 * This method assumes the following:
244 * <li>the operator is alive</li>
245 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
248 * @return a function that will start the preprocessor and returns its outcome, or
249 * {@code null} if this operation needs no preprocessor
251 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
256 * Invokes the operation's guard step(s) as a "future".
258 * This method assumes the following:
260 * <li>the operator is alive</li>
261 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
264 * @return a function that will start the guard checks and returns its outcome, or
265 * {@code null} if this operation has no guard
267 protected CompletableFuture<OperationOutcome> startGuardAsync() {
268 if (params.isPreprocessed()) {
272 // get the guard payload
273 Map<String, Object> payload = makeGuardPayload();
276 * Note: can't use constants from actor.guard, because that would create a
277 * circular dependency.
279 return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
280 .payload(payload).build().start();
284 * Creates a payload to execute a guard operation.
286 * @return a new guard payload
288 protected Map<String, Object> makeGuardPayload() {
289 // TODO delete this once preprocessing is done by the application
290 Map<String, Object> guard = new LinkedHashMap<>();
291 guard.put("actor", params.getActor());
292 guard.put("operation", params.getOperation());
293 guard.put("target", getTargetEntity());
294 guard.put("requestId", params.getRequestId());
296 String clname = params.getContext().getEvent().getClosedLoopControlName();
297 if (clname != null) {
298 guard.put("clname", clname);
305 * Starts the operation attempt, with no preprocessor. When all retries complete, it
306 * will complete the controller.
308 * @param controller controller for all operation attempts
309 * @param attempt attempt number, typically starting with 1
310 * @return a future that will return the final result of all attempts
312 private CompletableFuture<OperationOutcome> startOperationAttempt(
313 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
315 generateSubRequestId(attempt);
317 // propagate "stop" to the operation attempt
318 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
319 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
325 * Generates and sets {@link #subRequestId} to a new subrequest ID.
327 * @param attempt attempt number, typically starting with 1
329 public void generateSubRequestId(int attempt) {
330 // Note: this should be "protected", but that makes junits much messier
332 setSubRequestId(UUID.randomUUID().toString());
336 * Starts the operation attempt, without doing any retries.
338 * @param params operation parameters
339 * @param attempt attempt number, typically starting with 1
340 * @return a future that will return the result of a single operation attempt
342 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
344 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
346 final Executor executor = params.getExecutor();
347 final OperationOutcome outcome = params.makeOutcome(getTargetEntity());
348 final CallbackManager callbacks = new CallbackManager();
350 // this operation attempt gets its own controller
351 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
353 // propagate "stop" to the callbacks
354 controller.add(callbacks);
357 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
358 .whenCompleteAsync(callbackStarted(callbacks), executor)
359 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
362 // handle timeouts, if specified
363 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
364 if (timeoutMillis > 0) {
365 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
366 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
370 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
371 * before callbackCompleted() is invoked.
373 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
374 * the pipeline as the last step anyway.
378 future.exceptionally(fromException("operation"))
379 .thenApply(setRetryFlag(attempt))
380 .whenCompleteAsync(callbackStarted(callbacks), executor)
381 .whenCompleteAsync(callbackCompleted(callbacks), executor)
382 .whenCompleteAsync(controller.delayedComplete(), executor);
389 * Determines if the outcome was successful.
391 * @param outcome outcome to examine
392 * @return {@code true} if the outcome was successful
394 protected boolean isSuccess(OperationOutcome outcome) {
395 return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
399 * Determines if the outcome was a failure for this operator.
401 * @param outcome outcome to examine, or {@code null}
402 * @return {@code true} if the outcome is not {@code null} and was a failure
403 * <i>and</i> was associated with this operator, {@code false} otherwise
405 protected boolean isActorFailed(OperationOutcome outcome) {
406 return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
410 * Determines if the given outcome is for this operation.
412 * @param outcome outcome to examine
413 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
415 protected boolean isSameOperation(OperationOutcome outcome) {
416 return OperationOutcome.isFor(outcome, getActorName(), getName());
420 * Invokes the operation as a "future". This method simply invokes
421 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
422 * returning the result via a "future".
424 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
425 * the executor in the "params", as that may bring the background thread pool to a
426 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
429 * This method assumes the following:
431 * <li>the operator is alive</li>
432 * <li>verifyRunning() has been invoked</li>
433 * <li>callbackStarted() has been invoked</li>
434 * <li>the invoker will perform appropriate timeout checks</li>
435 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
438 * @param attempt attempt number, typically starting with 1
439 * @return a function that will start the operation and return its result when
442 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
444 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
448 * Low-level method that performs the operation. This can make the same assumptions
449 * that are made by {@link #doOperationAsFuture()}. This particular method simply
450 * throws an {@link UnsupportedOperationException}.
452 * @param attempt attempt number, typically starting with 1
453 * @param operation the operation being performed
454 * @return the outcome of the operation
456 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
458 throw new UnsupportedOperationException("start operation " + getFullName());
462 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
463 * FAILURE, assuming the policy specifies retries and the retry count has been
466 * @param attempt latest attempt number, starting with 1
467 * @return a function to get the next future to execute
469 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
471 return origOutcome -> {
472 // ensure we have a non-null outcome
473 OperationOutcome outcome;
474 if (origOutcome != null) {
475 outcome = origOutcome;
477 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
478 outcome = this.setOutcome(params.makeOutcome(getTargetEntity()), PolicyResult.FAILURE);
481 // ensure correct actor/operation
482 outcome.setActor(getActorName());
483 outcome.setOperation(getName());
485 // determine if we should retry, based on the result
486 if (outcome.getResult() != PolicyResult.FAILURE) {
487 // do not retry success or other failure types (e.g., exception)
488 outcome.setFinalOutcome(true);
492 int retry = getRetry(params.getRetry());
494 // no retries were specified
495 outcome.setFinalOutcome(true);
497 } else if (attempt <= retry) {
498 // have more retries - not the final outcome
499 outcome.setFinalOutcome(false);
503 * retries were specified and we've already tried them all - change to
506 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
507 outcome.setResult(PolicyResult.FAILURE_RETRIES);
508 outcome.setFinalOutcome(true);
516 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
517 * was previously invoked, and thus that the "operation" is not {@code null}.
519 * @param controller controller for all of the retries
520 * @param attempt latest attempt number, starting with 1
521 * @return a function to get the next future to execute
523 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
524 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
526 return operation -> {
527 if (!isActorFailed(operation)) {
528 // wrong type or wrong operation - just leave it as is
529 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
530 controller.complete(operation);
531 return new CompletableFuture<>();
534 if (getRetry(params.getRetry()) <= 0) {
535 // no retries - already marked as FAILURE, so just return it
536 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
537 controller.complete(operation);
538 return new CompletableFuture<>();
542 * Retry the operation.
544 long waitMs = getRetryWaitMs();
545 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
547 return sleep(waitMs, TimeUnit.MILLISECONDS)
548 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
553 * Convenience method that starts a sleep(), running via a future.
555 * @param sleepTime time to sleep
556 * @param unit time unit
557 * @return a future that will complete when the sleep completes
559 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
560 if (sleepTime <= 0) {
561 return CompletableFuture.completedFuture(null);
564 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
568 * Converts an exception into an operation outcome, returning a copy of the outcome to
569 * prevent background jobs from changing it.
571 * @param type type of item throwing the exception
572 * @return a function that will convert an exception into an operation outcome
574 private Function<Throwable, OperationOutcome> fromException(String type) {
577 OperationOutcome outcome = params.makeOutcome(getTargetEntity());
579 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
580 // do not include exception in the message, as it just clutters the log
581 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
582 params.getRequestId());
584 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
585 params.getRequestId(), thrown);
588 return setOutcome(outcome, thrown);
593 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
594 * any outstanding futures when one completes.
596 * @param futureMakers function to make a future. If the function returns
597 * {@code null}, then no future is created for that function. On the other
598 * hand, if the function throws an exception, then the previously created
599 * functions are canceled and the exception is re-thrown
600 * @return a future to cancel or await an outcome, or {@code null} if no futures were
601 * created. If this future is canceled, then all of the futures will be
604 public CompletableFuture<OperationOutcome> anyOf(
605 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
607 return anyOf(Arrays.asList(futureMakers));
611 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
612 * any outstanding futures when one completes.
614 * @param futureMakers function to make a future. If the function returns
615 * {@code null}, then no future is created for that function. On the other
616 * hand, if the function throws an exception, then the previously created
617 * functions are canceled and the exception is re-thrown
618 * @return a future to cancel or await an outcome, or {@code null} if no futures were
619 * created. If this future is canceled, then all of the futures will be
620 * canceled. Similarly, when this future completes, any incomplete futures
623 public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
625 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
627 CompletableFuture<OperationOutcome>[] futures =
628 attachFutures(controller, futureMakers, UnaryOperator.identity());
630 if (futures.length == 0) {
631 // no futures were started
635 if (futures.length == 1) {
639 CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
640 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
646 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
648 * @param futureMakers function to make a future. If the function returns
649 * {@code null}, then no future is created for that function. On the other
650 * hand, if the function throws an exception, then the previously created
651 * functions are canceled and the exception is re-thrown
652 * @return a future to cancel or await an outcome, or {@code null} if no futures were
653 * created. If this future is canceled, then all of the futures will be
656 public CompletableFuture<OperationOutcome> allOf(
657 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
659 return allOf(Arrays.asList(futureMakers));
663 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
665 * @param futureMakers function to make a future. If the function returns
666 * {@code null}, then no future is created for that function. On the other
667 * hand, if the function throws an exception, then the previously created
668 * functions are canceled and the exception is re-thrown
669 * @return a future to cancel or await an outcome, or {@code null} if no futures were
670 * created. If this future is canceled, then all of the futures will be
671 * canceled. Similarly, when this future completes, any incomplete futures
674 public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
675 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
677 Queue<OperationOutcome> outcomes = new LinkedList<>();
679 CompletableFuture<OperationOutcome>[] futures =
680 attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
681 synchronized (outcomes) {
682 outcomes.add(outcome);
687 if (futures.length == 0) {
688 // no futures were started
692 if (futures.length == 1) {
697 CompletableFuture.allOf(futures)
698 .thenApply(unused -> combineOutcomes(outcomes))
699 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
706 * Invokes the functions to create the futures and attaches them to the controller.
708 * @param controller master controller for all of the futures
709 * @param futureMakers futures to be attached to the controller
710 * @param adorn function that "adorns" the future, possible adding onto its pipeline.
711 * Returns the adorned future
712 * @return an array of futures, possibly zero-length. If the array is of size one,
713 * then that one item should be returned instead of the controller
715 private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
716 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
717 UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
719 if (futureMakers.isEmpty()) {
720 @SuppressWarnings("unchecked")
721 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
725 // the last, unadorned future that is created
726 CompletableFuture<OperationOutcome> lastFuture = null;
728 List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
731 for (var maker : futureMakers) {
733 CompletableFuture<OperationOutcome> future = maker.get();
734 if (future == null) {
738 // propagate "stop" to the future
739 controller.add(future);
741 futures.add(adorn.apply(future));
745 } catch (RuntimeException e) {
746 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
747 controller.cancel(false);
752 @SuppressWarnings("unchecked")
753 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
755 if (result.length == 1) {
756 // special case - return the unadorned future
757 result[0] = lastFuture;
761 return futures.toArray(result);
765 * Combines the outcomes from a set of tasks.
767 * @param outcomes outcomes to be examined
768 * @return the combined outcome
770 private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
772 // identify the outcome with the highest priority
773 OperationOutcome outcome = outcomes.remove();
774 int priority = detmPriority(outcome);
776 for (OperationOutcome outcome2 : outcomes) {
777 int priority2 = detmPriority(outcome2);
779 if (priority2 > priority) {
781 priority = priority2;
785 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
786 (outcome == null ? null : outcome.getResult()), params.getRequestId());
792 * Determines the priority of an outcome based on its result.
794 * @param outcome outcome to examine, or {@code null}
795 * @return the outcome's priority
797 protected int detmPriority(OperationOutcome outcome) {
798 if (outcome == null || outcome.getResult() == null) {
802 switch (outcome.getResult()) {
809 case FAILURE_RETRIES:
815 case FAILURE_TIMEOUT:
818 case FAILURE_EXCEPTION:
825 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
826 * not created until the previous task completes. The pipeline returns the outcome of
827 * the last task executed.
829 * @param futureMakers functions to make the futures
830 * @return a future to cancel the sequence or await the outcome
832 public CompletableFuture<OperationOutcome> sequence(
833 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
835 return sequence(Arrays.asList(futureMakers));
839 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
840 * not created until the previous task completes. The pipeline returns the outcome of
841 * the last task executed.
843 * @param futureMakers functions to make the futures
844 * @return a future to cancel the sequence or await the outcome, or {@code null} if
845 * there were no tasks to perform
847 public CompletableFuture<OperationOutcome> sequence(
848 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
850 Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
852 CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
853 if (nextTask == null) {
858 if (queue.isEmpty()) {
859 // only one task - just return it rather than wrapping it in a controller
864 * multiple tasks - need a controller to stop whichever task is currently
867 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
868 final Executor executor = params.getExecutor();
871 controller.wrap(nextTask)
872 .thenCompose(nextTaskOnSuccess(controller, queue))
873 .whenCompleteAsync(controller.delayedComplete(), executor);
880 * Executes the next task in the queue, if the previous outcome was successful.
882 * @param controller pipeline controller
883 * @param taskQueue queue of tasks to be performed
884 * @return a future to execute the remaining tasks, or the current outcome, if it's a
885 * failure, or if there are no more tasks
887 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
888 PipelineControllerFuture<OperationOutcome> controller,
889 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
892 if (!isSuccess(outcome)) {
893 // return the failure
894 return CompletableFuture.completedFuture(outcome);
897 CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
898 if (nextTask == null) {
899 // no tasks - just return the success
900 return CompletableFuture.completedFuture(outcome);
906 .thenCompose(nextTaskOnSuccess(controller, taskQueue));
912 * Gets the next task from the queue, skipping those that are {@code null}.
914 * @param taskQueue task queue
915 * @return the next task, or {@code null} if the queue is now empty
917 private CompletableFuture<OperationOutcome> getNextTask(
918 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
920 Supplier<CompletableFuture<OperationOutcome>> maker;
922 while ((maker = taskQueue.poll()) != null) {
923 CompletableFuture<OperationOutcome> future = maker.get();
924 if (future != null) {
933 * Sets the start time of the operation and invokes the callback to indicate that the
934 * operation has started. Does nothing if the pipeline has been stopped.
936 * This assumes that the "outcome" is not {@code null}.
938 * @param callbacks used to determine if the start callback can be invoked
939 * @return a function that sets the start time and invokes the callback
941 protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
943 return (outcome, thrown) -> {
945 if (callbacks.canStart()) {
946 outcome.setSubRequestId(getSubRequestId());
947 outcome.setStart(callbacks.getStartTime());
948 outcome.setEnd(null);
950 // pass a copy to the callback
951 OperationOutcome outcome2 = new OperationOutcome(outcome);
952 outcome2.setFinalOutcome(false);
953 params.callbackStarted(outcome2);
959 * Sets the end time of the operation and invokes the callback to indicate that the
960 * operation has completed. Does nothing if the pipeline has been stopped.
962 * This assumes that the "outcome" is not {@code null}.
964 * Note: the start time must be a reference rather than a plain value, because it's
965 * value must be gotten on-demand, when the returned function is executed at a later
968 * @param callbacks used to determine if the end callback can be invoked
969 * @return a function that sets the end time and invokes the callback
971 protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
973 return (outcome, thrown) -> {
974 if (callbacks.canEnd()) {
975 outcome.setSubRequestId(getSubRequestId());
976 outcome.setStart(callbacks.getStartTime());
977 outcome.setEnd(callbacks.getEndTime());
979 // pass a copy to the callback
980 params.callbackCompleted(new OperationOutcome(outcome));
986 * Sets an operation's outcome and message, based on a throwable.
988 * @param operation operation to be updated
989 * @return the updated operation
991 public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
992 PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
993 return setOutcome(operation, result);
997 * Sets an operation's outcome and default message based on the result.
999 * @param operation operation to be updated
1000 * @param result result of the operation
1001 * @return the updated operation
1003 public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
1004 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
1005 operation.setResult(result);
1006 operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
1007 : ControlLoopOperation.FAILED_MSG);
1013 * Determines if a throwable is due to a timeout.
1015 * @param thrown throwable of interest
1016 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
1018 protected boolean isTimeout(Throwable thrown) {
1019 if (thrown instanceof CompletionException) {
1020 thrown = thrown.getCause();
1023 return (thrown instanceof TimeoutException);
1027 * Logs a message. If the message is not of type, String, then it attempts to
1028 * pretty-print it into JSON before logging.
1030 * @param direction IN or OUT
1031 * @param infra communication infrastructure on which it was published
1032 * @param source source name (e.g., the URL or Topic name)
1033 * @param message message to be logged
1034 * @return the JSON text that was logged
1036 public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
1039 json = prettyPrint(message);
1041 } catch (IllegalArgumentException e) {
1042 String type = (direction == EventType.IN ? "response" : "request");
1043 logger.warn("cannot pretty-print {}", type, e);
1044 json = message.toString();
1047 logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1053 * Converts a message to a "pretty-printed" String using the operation's normal
1054 * serialization provider (i.e., it's <i>coder</i>).
1056 * @param message response to be logged
1057 * @return the JSON text that was logged
1058 * @throws IllegalArgumentException if the message cannot be converted
1060 public <T> String prettyPrint(T message) {
1061 if (message == null) {
1063 } else if (message instanceof String) {
1064 return message.toString();
1067 return getCoder().encode(message, true);
1068 } catch (CoderException e) {
1069 throw new IllegalArgumentException("cannot encode message", e);
1074 // these may be overridden by subclasses or junit tests
1077 * Gets the retry count.
1079 * @param retry retry, extracted from the parameters, or {@code null}
1080 * @return the number of retries, or {@code 0} if no retries were specified
1082 protected int getRetry(Integer retry) {
1083 return (retry == null ? 0 : retry);
1087 * Gets the retry wait, in milliseconds.
1089 * @return the retry wait, in milliseconds
1091 protected long getRetryWaitMs() {
1092 return DEFAULT_RETRY_WAIT_MS;
1096 * Gets the target entity, first trying the properties and then the parameters.
1098 * @return the target entity
1100 protected String getTargetEntity() {
1101 String targetEntity = getProperty(OperationProperties.AAI_TARGET_ENTITY);
1102 return (targetEntity != null ? targetEntity : params.getTargetEntity());
1106 * Gets the operation timeout.
1108 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1110 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1113 protected long getTimeoutMs(Integer timeoutSec) {
1114 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1117 // these may be overridden by junit tests
1119 protected Coder getCoder() {