2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedList;
28 import java.util.List;
30 import java.util.Queue;
31 import java.util.UUID;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.CompletionException;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.function.BiConsumer;
39 import java.util.function.Function;
40 import java.util.function.Supplier;
41 import java.util.function.UnaryOperator;
42 import lombok.AccessLevel;
45 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
46 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
48 import org.onap.policy.common.utils.coder.Coder;
49 import org.onap.policy.common.utils.coder.CoderException;
50 import org.onap.policy.common.utils.coder.StandardCoder;
51 import org.onap.policy.controlloop.ControlLoopOperation;
52 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
53 import org.onap.policy.controlloop.actorserviceprovider.Operation;
54 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
59 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
64 * Partial implementation of an operator. In general, it's preferable that subclasses
65 * would override {@link #startOperationAsync(int, OperationOutcome)
66 * startOperationAsync()}. However, if that proves to be too difficult, then they can
67 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}.
69 * The futures returned by the methods within this class can be canceled, and will
70 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
71 * returned by overridden methods will do the same. Of course, if a class overrides
72 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
73 * be done to cancel that particular operation.
75 * In general tasks in a pipeline are executed by the same thread. However, the following
76 * should always be executed via the executor specified in "params":
78 * <li>start callback</li>
79 * <li>completion callback</li>
80 * <li>controller completion (i.e., delayedComplete())</li>
83 public abstract class OperationPartial implements Operation {
84 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
85 private static final Coder coder = new StandardCoder();
87 public static final String GUARD_ACTOR_NAME = "GUARD";
88 public static final String GUARD_OPERATION_NAME = "Decision";
89 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
91 private final OperatorConfig config;
94 * Operation parameters.
96 protected final ControlLoopOperationParams params;
99 private final String fullName;
102 @Setter(AccessLevel.PROTECTED)
103 private String subRequestId;
106 private final List<String> propertyNames;
109 * Values for the properties identified by {@link #getPropertyNames()}.
111 private final Map<String, Object> properties = new HashMap<>();
115 * Constructs the object.
117 * @param params operation parameters
118 * @param config configuration for this operation
119 * @param propertyNames names of properties required by this operation
121 protected OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
122 this.params = params;
123 this.config = config;
124 this.fullName = params.getActor() + "." + params.getOperation();
125 this.propertyNames = propertyNames;
128 public Executor getBlockingExecutor() {
129 return config.getBlockingExecutor();
133 public String getActorName() {
134 return params.getActor();
138 public String getName() {
139 return params.getOperation();
143 public boolean containsProperty(String name) {
144 return properties.containsKey(name);
148 public void setProperty(String name, Object value) {
149 logger.info("{}: set property {}={}", getFullName(), name, value);
150 properties.put(name, value);
153 @SuppressWarnings("unchecked")
155 public <T> T getProperty(String name) {
156 return (T) properties.get(name);
160 * Gets a property value, throwing an exception if it's missing.
162 * @param name property name
163 * @param propertyType property type, used in an error message if the property value
165 * @return the property value
167 @SuppressWarnings("unchecked")
168 protected <T> T getRequiredProperty(String name, String propertyType) {
169 T value = (T) properties.get(name);
171 throw new IllegalStateException("missing " + propertyType);
178 public CompletableFuture<OperationOutcome> start() {
179 // allocate a controller for the entire operation
180 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
183 return startOperationAttempt(controller, 1);
187 * Starts the operation attempt. When all retries complete, it will complete the
190 * @param controller controller for all operation attempts
191 * @param attempt attempt number, typically starting with 1
192 * @return a future that will return the final result of all attempts
194 private CompletableFuture<OperationOutcome> startOperationAttempt(
195 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
197 generateSubRequestId(attempt);
199 // propagate "stop" to the operation attempt
200 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
201 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
207 * Generates and sets {@link #subRequestId} to a new subrequest ID.
209 * @param attempt attempt number, typically starting with 1
211 public void generateSubRequestId(int attempt) {
212 // Note: this should be "protected", but that makes junits much messier
214 setSubRequestId(UUID.randomUUID().toString());
218 * Starts the operation attempt, without doing any retries.
220 * @param attempt attempt number, typically starting with 1
221 * @return a future that will return the result of a single operation attempt
223 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
225 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
227 final Executor executor = params.getExecutor();
228 final OperationOutcome outcome = makeOutcome();
229 final CallbackManager callbacks = new CallbackManager();
231 // this operation attempt gets its own controller
232 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
234 // propagate "stop" to the callbacks
235 controller.add(callbacks);
238 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
239 .whenCompleteAsync(callbackStarted(callbacks), executor)
240 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
243 // handle timeouts, if specified
244 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
245 if (timeoutMillis > 0) {
246 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
247 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
251 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
252 * before callbackCompleted() is invoked.
254 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
255 * the pipeline as the last step anyway.
259 future.exceptionally(fromException("operation"))
260 .thenApply(setRetryFlag(attempt))
261 .whenCompleteAsync(callbackStarted(callbacks), executor)
262 .whenCompleteAsync(callbackCompleted(callbacks), executor)
263 .whenCompleteAsync(controller.delayedComplete(), executor);
270 * Determines if the outcome was successful.
272 * @param outcome outcome to examine
273 * @return {@code true} if the outcome was successful
275 protected boolean isSuccess(OperationOutcome outcome) {
276 return (outcome != null && outcome.getResult() == OperationResult.SUCCESS);
280 * Determines if the outcome was a failure for this operator.
282 * @param outcome outcome to examine, or {@code null}
283 * @return {@code true} if the outcome is not {@code null} and was a failure
284 * <i>and</i> was associated with this operator, {@code false} otherwise
286 protected boolean isActorFailed(OperationOutcome outcome) {
287 return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE);
291 * Determines if the given outcome is for this operation.
293 * @param outcome outcome to examine
294 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
296 protected boolean isSameOperation(OperationOutcome outcome) {
297 return OperationOutcome.isFor(outcome, getActorName(), getName());
301 * Invokes the operation as a "future". This method simply invokes
302 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
303 * returning the result via a "future".
305 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
306 * the executor in the "params", as that may bring the background thread pool to a
307 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
310 * This method assumes the following:
312 * <li>the operator is alive</li>
313 * <li>verifyRunning() has been invoked</li>
314 * <li>callbackStarted() has been invoked</li>
315 * <li>the invoker will perform appropriate timeout checks</li>
316 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
319 * @param attempt attempt number, typically starting with 1
320 * @return a function that will start the operation and return its result when
323 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
325 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
329 * Low-level method that performs the operation. This can make the same assumptions
330 * that are made by {@link #doOperationAsFuture()}. This particular method simply
331 * throws an {@link UnsupportedOperationException}.
333 * @param attempt attempt number, typically starting with 1
334 * @param operation the operation being performed
335 * @return the outcome of the operation
337 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
339 throw new UnsupportedOperationException("start operation " + getFullName());
343 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
344 * FAILURE, assuming the policy specifies retries and the retry count has been
347 * @param attempt latest attempt number, starting with 1
348 * @return a function to get the next future to execute
350 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
352 return origOutcome -> {
353 // ensure we have a non-null outcome
354 OperationOutcome outcome;
355 if (origOutcome != null) {
356 outcome = origOutcome;
358 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
359 outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE);
362 // ensure correct actor/operation
363 outcome.setActor(getActorName());
364 outcome.setOperation(getName());
366 // determine if we should retry, based on the result
367 if (outcome.getResult() != OperationResult.FAILURE) {
368 // do not retry success or other failure types (e.g., exception)
369 outcome.setFinalOutcome(true);
373 int retry = getRetry(params.getRetry());
375 // no retries were specified
376 outcome.setFinalOutcome(true);
378 } else if (attempt <= retry) {
379 // have more retries - not the final outcome
380 outcome.setFinalOutcome(false);
384 * retries were specified and we've already tried them all - change to
387 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
388 outcome.setResult(OperationResult.FAILURE_RETRIES);
389 outcome.setFinalOutcome(true);
397 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
398 * was previously invoked, and thus that the "operation" is not {@code null}.
400 * @param controller controller for all of the retries
401 * @param attempt latest attempt number, starting with 1
402 * @return a function to get the next future to execute
404 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
405 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
407 return operation -> {
408 if (!isActorFailed(operation)) {
409 // wrong type or wrong operation - just leave it as is
410 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
411 controller.complete(operation);
412 return new CompletableFuture<>();
415 if (getRetry(params.getRetry()) <= 0) {
416 // no retries - already marked as FAILURE, so just return it
417 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
418 controller.complete(operation);
419 return new CompletableFuture<>();
423 * Retry the operation.
425 long waitMs = getRetryWaitMs();
426 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
428 return sleep(waitMs, TimeUnit.MILLISECONDS)
429 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
434 * Convenience method that starts a sleep(), running via a future.
436 * @param sleepTime time to sleep
437 * @param unit time unit
438 * @return a future that will complete when the sleep completes
440 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
441 if (sleepTime <= 0) {
442 return CompletableFuture.completedFuture(null);
445 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
449 * Converts an exception into an operation outcome, returning a copy of the outcome to
450 * prevent background jobs from changing it.
452 * @param type type of item throwing the exception
453 * @return a function that will convert an exception into an operation outcome
455 private Function<Throwable, OperationOutcome> fromException(String type) {
458 OperationOutcome outcome = makeOutcome();
460 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
461 // do not include exception in the message, as it just clutters the log
462 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
463 params.getRequestId());
465 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
466 params.getRequestId(), thrown);
469 return setOutcome(outcome, thrown);
474 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
475 * any outstanding futures when one completes.
477 * @param futureMakers function to make a future. If the function returns
478 * {@code null}, then no future is created for that function. On the other
479 * hand, if the function throws an exception, then the previously created
480 * functions are canceled and the exception is re-thrown
481 * @return a future to cancel or await an outcome, or {@code null} if no futures were
482 * created. If this future is canceled, then all of the futures will be
485 public CompletableFuture<OperationOutcome> anyOf(
486 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
488 return anyOf(Arrays.asList(futureMakers));
492 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
493 * any outstanding futures when one completes.
495 * @param futureMakers function to make a future. If the function returns
496 * {@code null}, then no future is created for that function. On the other
497 * hand, if the function throws an exception, then the previously created
498 * functions are canceled and the exception is re-thrown
499 * @return a future to cancel or await an outcome, or {@code null} if no futures were
500 * created. If this future is canceled, then all of the futures will be
501 * canceled. Similarly, when this future completes, any incomplete futures
504 public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
506 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
508 CompletableFuture<OperationOutcome>[] futures =
509 attachFutures(controller, futureMakers, UnaryOperator.identity());
511 if (futures.length == 0) {
512 // no futures were started
516 if (futures.length == 1) {
520 CompletableFuture.anyOf(futures).thenApply(OperationOutcome.class::cast)
521 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
527 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
529 * @param futureMakers function to make a future. If the function returns
530 * {@code null}, then no future is created for that function. On the other
531 * hand, if the function throws an exception, then the previously created
532 * functions are canceled and the exception is re-thrown
533 * @return a future to cancel or await an outcome, or {@code null} if no futures were
534 * created. If this future is canceled, then all of the futures will be
537 public CompletableFuture<OperationOutcome> allOf(
538 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
540 return allOf(Arrays.asList(futureMakers));
544 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
546 * @param futureMakers function to make a future. If the function returns
547 * {@code null}, then no future is created for that function. On the other
548 * hand, if the function throws an exception, then the previously created
549 * functions are canceled and the exception is re-thrown
550 * @return a future to cancel or await an outcome, or {@code null} if no futures were
551 * created. If this future is canceled, then all of the futures will be
552 * canceled. Similarly, when this future completes, any incomplete futures
555 public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
556 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
558 Queue<OperationOutcome> outcomes = new LinkedList<>();
560 CompletableFuture<OperationOutcome>[] futures =
561 attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
562 synchronized (outcomes) {
563 outcomes.add(outcome);
568 if (futures.length == 0) {
569 // no futures were started
573 if (futures.length == 1) {
578 CompletableFuture.allOf(futures)
579 .thenApply(unused -> combineOutcomes(outcomes))
580 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
587 * Invokes the functions to create the futures and attaches them to the controller.
589 * @param controller master controller for all of the futures
590 * @param futureMakers futures to be attached to the controller
591 * @param adorn function that "adorns" the future, possible adding onto its pipeline.
592 * Returns the adorned future
593 * @return an array of futures, possibly zero-length. If the array is of size one,
594 * then that one item should be returned instead of the controller
596 private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
597 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
598 UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
600 if (futureMakers.isEmpty()) {
601 @SuppressWarnings("unchecked")
602 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
606 // the last, unadorned future that is created
607 CompletableFuture<OperationOutcome> lastFuture = null;
609 List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
612 for (var maker : futureMakers) {
614 CompletableFuture<OperationOutcome> future = maker.get();
615 if (future == null) {
619 // propagate "stop" to the future
620 controller.add(future);
622 futures.add(adorn.apply(future));
626 } catch (RuntimeException e) {
627 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
628 controller.cancel(false);
633 @SuppressWarnings("unchecked")
634 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
636 if (result.length == 1) {
637 // special case - return the unadorned future
638 result[0] = lastFuture;
642 return futures.toArray(result);
646 * Combines the outcomes from a set of tasks.
648 * @param outcomes outcomes to be examined
649 * @return the combined outcome
651 private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
653 // identify the outcome with the highest priority
654 OperationOutcome outcome = outcomes.remove();
655 int priority = detmPriority(outcome);
657 for (OperationOutcome outcome2 : outcomes) {
658 int priority2 = detmPriority(outcome2);
660 if (priority2 > priority) {
662 priority = priority2;
666 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
667 (outcome == null ? null : outcome.getResult()), params.getRequestId());
673 * Determines the priority of an outcome based on its result.
675 * @param outcome outcome to examine, or {@code null}
676 * @return the outcome's priority
678 protected int detmPriority(OperationOutcome outcome) {
679 if (outcome == null || outcome.getResult() == null) {
683 switch (outcome.getResult()) {
690 case FAILURE_RETRIES:
696 case FAILURE_TIMEOUT:
699 case FAILURE_EXCEPTION:
706 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
707 * not created until the previous task completes. The pipeline returns the outcome of
708 * the last task executed.
710 * @param futureMakers functions to make the futures
711 * @return a future to cancel the sequence or await the outcome
713 public CompletableFuture<OperationOutcome> sequence(
714 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
716 return sequence(Arrays.asList(futureMakers));
720 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
721 * not created until the previous task completes. The pipeline returns the outcome of
722 * the last task executed.
724 * @param futureMakers functions to make the futures
725 * @return a future to cancel the sequence or await the outcome, or {@code null} if
726 * there were no tasks to perform
728 public CompletableFuture<OperationOutcome> sequence(
729 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
731 Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
733 CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
734 if (nextTask == null) {
739 if (queue.isEmpty()) {
740 // only one task - just return it rather than wrapping it in a controller
745 * multiple tasks - need a controller to stop whichever task is currently
748 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
749 final Executor executor = params.getExecutor();
752 controller.wrap(nextTask)
753 .thenCompose(nextTaskOnSuccess(controller, queue))
754 .whenCompleteAsync(controller.delayedComplete(), executor);
761 * Executes the next task in the queue, if the previous outcome was successful.
763 * @param controller pipeline controller
764 * @param taskQueue queue of tasks to be performed
765 * @return a future to execute the remaining tasks, or the current outcome, if it's a
766 * failure, or if there are no more tasks
768 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
769 PipelineControllerFuture<OperationOutcome> controller,
770 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
773 if (!isSuccess(outcome)) {
774 // return the failure
775 return CompletableFuture.completedFuture(outcome);
778 CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
779 if (nextTask == null) {
780 // no tasks - just return the success
781 return CompletableFuture.completedFuture(outcome);
787 .thenCompose(nextTaskOnSuccess(controller, taskQueue));
793 * Gets the next task from the queue, skipping those that are {@code null}.
795 * @param taskQueue task queue
796 * @return the next task, or {@code null} if the queue is now empty
798 private CompletableFuture<OperationOutcome> getNextTask(
799 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
801 Supplier<CompletableFuture<OperationOutcome>> maker;
803 while ((maker = taskQueue.poll()) != null) {
804 CompletableFuture<OperationOutcome> future = maker.get();
805 if (future != null) {
814 * Sets the start time of the operation and invokes the callback to indicate that the
815 * operation has started. Does nothing if the pipeline has been stopped.
817 * This assumes that the "outcome" is not {@code null}.
819 * @param callbacks used to determine if the start callback can be invoked
820 * @return a function that sets the start time and invokes the callback
822 protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
824 return (outcome, thrown) -> {
826 if (callbacks.canStart()) {
827 outcome.setSubRequestId(getSubRequestId());
828 outcome.setStart(callbacks.getStartTime());
829 outcome.setEnd(null);
831 // pass a copy to the callback
832 OperationOutcome outcome2 = new OperationOutcome(outcome);
833 outcome2.setFinalOutcome(false);
834 params.callbackStarted(outcome2);
840 * Sets the end time of the operation and invokes the callback to indicate that the
841 * operation has completed. Does nothing if the pipeline has been stopped.
843 * This assumes that the "outcome" is not {@code null}.
845 * Note: the start time must be a reference rather than a plain value, because it's
846 * value must be gotten on-demand, when the returned function is executed at a later
849 * @param callbacks used to determine if the end callback can be invoked
850 * @return a function that sets the end time and invokes the callback
852 protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
854 return (outcome, thrown) -> {
855 if (callbacks.canEnd()) {
856 outcome.setSubRequestId(getSubRequestId());
857 outcome.setStart(callbacks.getStartTime());
858 outcome.setEnd(callbacks.getEndTime());
860 // pass a copy to the callback
861 params.callbackCompleted(new OperationOutcome(outcome));
867 * Sets an operation's outcome and message, based on a throwable.
869 * @param operation operation to be updated
870 * @return the updated operation
872 public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
873 OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT
874 : OperationResult.FAILURE_EXCEPTION);
875 return setOutcome(operation, result);
879 * Sets an operation's outcome and default message based on the result.
881 * @param operation operation to be updated
882 * @param result result of the operation
883 * @return the updated operation
885 public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) {
886 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
887 operation.setResult(result);
888 operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
889 : ControlLoopOperation.FAILED_MSG);
895 * Makes an outcome, populating the "target" field with the contents of the target
898 * @return a new operation outcome
900 protected OperationOutcome makeOutcome() {
901 OperationOutcome outcome = params.makeOutcome();
902 outcome.setTarget(getProperty(OperationProperties.AAI_TARGET_ENTITY));
907 * Determines if a throwable is due to a timeout.
909 * @param thrown throwable of interest
910 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
912 protected boolean isTimeout(Throwable thrown) {
913 if (thrown instanceof CompletionException) {
914 thrown = thrown.getCause();
917 return (thrown instanceof TimeoutException);
921 * Logs a message. If the message is not of type, String, then it attempts to
922 * pretty-print it into JSON before logging.
924 * @param direction IN or OUT
925 * @param infra communication infrastructure on which it was published
926 * @param source source name (e.g., the URL or Topic name)
927 * @param message message to be logged
928 * @return the JSON text that was logged
930 public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
933 json = prettyPrint(message);
935 } catch (IllegalArgumentException e) {
936 String type = (direction == EventType.IN ? "response" : "request");
937 logger.warn("cannot pretty-print {}", type, e);
938 json = message.toString();
941 logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
947 * Converts a message to a "pretty-printed" String using the operation's normal
948 * serialization provider (i.e., it's <i>coder</i>).
950 * @param message response to be logged
951 * @return the JSON text that was logged
952 * @throws IllegalArgumentException if the message cannot be converted
954 public <T> String prettyPrint(T message) {
955 if (message == null) {
957 } else if (message instanceof String) {
958 return message.toString();
961 return getCoder().encode(message, true);
962 } catch (CoderException e) {
963 throw new IllegalArgumentException("cannot encode message", e);
968 // these may be overridden by subclasses or junit tests
971 * Gets the retry count.
973 * @param retry retry, extracted from the parameters, or {@code null}
974 * @return the number of retries, or {@code 0} if no retries were specified
976 protected int getRetry(Integer retry) {
977 return (retry == null ? 0 : retry);
981 * Gets the retry wait, in milliseconds.
983 * @return the retry wait, in milliseconds
985 protected long getRetryWaitMs() {
986 return DEFAULT_RETRY_WAIT_MS;
990 * Gets the operation timeout.
992 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
994 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
997 protected long getTimeoutMs(Integer timeoutSec) {
998 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1001 // these may be overridden by junit tests
1003 protected Coder getCoder() {