2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedList;
28 import java.util.List;
30 import java.util.Queue;
31 import java.util.UUID;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.CompletionException;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.function.BiConsumer;
39 import java.util.function.Function;
40 import java.util.function.Supplier;
41 import java.util.function.UnaryOperator;
42 import lombok.AccessLevel;
45 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
46 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
48 import org.onap.policy.common.utils.coder.Coder;
49 import org.onap.policy.common.utils.coder.CoderException;
50 import org.onap.policy.common.utils.coder.StandardCoder;
51 import org.onap.policy.controlloop.ControlLoopOperation;
52 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
53 import org.onap.policy.controlloop.actorserviceprovider.Operation;
54 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
59 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
64 * Partial implementation of an operator. In general, it's preferable that subclasses
65 * would override {@link #startOperationAsync(int, OperationOutcome)
66 * startOperationAsync()}. However, if that proves to be too difficult, then they can
67 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}.
69 * The futures returned by the methods within this class can be canceled, and will
70 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
71 * returned by overridden methods will do the same. Of course, if a class overrides
72 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
73 * be done to cancel that particular operation.
75 * In general tasks in a pipeline are executed by the same thread. However, the following
76 * should always be executed via the executor specified in "params":
78 * <li>start callback</li>
79 * <li>completion callback</li>
80 * <li>controller completion (i.e., delayedComplete())</li>
83 public abstract class OperationPartial implements Operation {
84 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
85 private static final Coder coder = new StandardCoder();
87 public static final String GUARD_ACTOR_NAME = "GUARD";
88 public static final String GUARD_OPERATION_NAME = "Decision";
89 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
91 private final OperatorConfig config;
94 * Operation parameters.
96 protected final ControlLoopOperationParams params;
99 private final String fullName;
102 @Setter(AccessLevel.PROTECTED)
103 private String subRequestId;
106 private final List<String> propertyNames;
109 * Values for the properties identified by {@link #getPropertyNames()}.
111 private final Map<String, Object> properties = new HashMap<>();
115 * Constructs the object.
117 * @param params operation parameters
118 * @param config configuration for this operation
119 * @param propertyNames names of properties required by this operation
121 protected OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
122 this.params = params;
123 this.config = config;
124 this.fullName = params.getActor() + "." + params.getOperation();
125 this.propertyNames = propertyNames;
128 public Executor getBlockingExecutor() {
129 return config.getBlockingExecutor();
133 public String getActorName() {
134 return params.getActor();
138 public String getName() {
139 return params.getOperation();
143 public boolean containsProperty(String name) {
144 return properties.containsKey(name);
148 public void setProperty(String name, Object value) {
149 logger.info("{}: set property {}={}", getFullName(), name, value);
150 properties.put(name, value);
153 @SuppressWarnings("unchecked")
155 public <T> T getProperty(String name) {
156 return (T) properties.get(name);
160 * Gets a property value, throwing an exception if it's missing.
162 * @param name property name
163 * @param propertyType property type, used in an error message if the property value
165 * @return the property value
167 @SuppressWarnings("unchecked")
168 public <T> T getRequiredProperty(String name, String propertyType) {
169 T value = (T) properties.get(name);
171 throw new IllegalStateException("missing " + propertyType);
178 public CompletableFuture<OperationOutcome> start() {
179 // allocate a controller for the entire operation
180 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
183 return startOperationAttempt(controller, 1);
187 * Starts the operation attempt. When all retries complete, it will complete the
190 * @param controller controller for all operation attempts
191 * @param attempt attempt number, typically starting with 1
192 * @return a future that will return the final result of all attempts
194 private CompletableFuture<OperationOutcome> startOperationAttempt(
195 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
197 generateSubRequestId(attempt);
199 // propagate "stop" to the operation attempt
200 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
201 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
207 * Generates and sets {@link #subRequestId} to a new subrequest ID.
209 * @param attempt attempt number, typically starting with 1
211 public void generateSubRequestId(int attempt) {
212 // Note: this should be "protected", but that makes junits much messier
214 setSubRequestId(UUID.randomUUID().toString());
218 * Starts the operation attempt, without doing any retries.
220 * @param attempt attempt number, typically starting with 1
221 * @return a future that will return the result of a single operation attempt
223 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
225 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
227 final var executor = params.getExecutor();
228 final var outcome = makeOutcome();
229 final var callbacks = new CallbackManager();
231 // this operation attempt gets its own controller
232 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
234 // propagate "stop" to the callbacks
235 controller.add(callbacks);
238 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
239 .whenCompleteAsync(callbackStarted(callbacks), executor)
240 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
243 // handle timeouts, if specified
244 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
245 if (timeoutMillis > 0) {
246 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
247 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
251 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
252 * before callbackCompleted() is invoked.
254 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
255 * the pipeline as the last step anyway.
259 future.exceptionally(fromException("operation"))
260 .thenApply(setRetryFlag(attempt))
261 .whenCompleteAsync(callbackStarted(callbacks), executor)
262 .whenCompleteAsync(callbackCompleted(callbacks), executor)
263 .whenCompleteAsync(controller.delayedComplete(), executor);
270 * Determines if the outcome was successful.
272 * @param outcome outcome to examine
273 * @return {@code true} if the outcome was successful
275 protected boolean isSuccess(OperationOutcome outcome) {
276 return (outcome != null && outcome.getResult() == OperationResult.SUCCESS);
280 * Determines if the outcome was a failure for this operator.
282 * @param outcome outcome to examine, or {@code null}
283 * @return {@code true} if the outcome is not {@code null} and was a failure
284 * <i>and</i> was associated with this operator, {@code false} otherwise
286 protected boolean isActorFailed(OperationOutcome outcome) {
287 return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE);
291 * Determines if the given outcome is for this operation.
293 * @param outcome outcome to examine
294 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
296 protected boolean isSameOperation(OperationOutcome outcome) {
297 return OperationOutcome.isFor(outcome, getActorName(), getName());
301 * Invokes the operation as a "future". This method simply invokes
302 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
303 * returning the result via a "future".
305 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
306 * the executor in the "params", as that may bring the background thread pool to a
307 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
310 * This method assumes the following:
312 * <li>the operator is alive</li>
313 * <li>verifyRunning() has been invoked</li>
314 * <li>callbackStarted() has been invoked</li>
315 * <li>the invoker will perform appropriate timeout checks</li>
316 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
319 * @param attempt attempt number, typically starting with 1
320 * @return a function that will start the operation and return its result when
323 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
325 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
329 * Low-level method that performs the operation. This can make the same assumptions
330 * that are made by {@link #doOperationAsFuture()}. This particular method simply
331 * throws an {@link UnsupportedOperationException}.
333 * @param attempt attempt number, typically starting with 1
334 * @param operation the operation being performed
335 * @return the outcome of the operation
337 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
339 throw new UnsupportedOperationException("start operation " + getFullName());
343 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
344 * FAILURE, assuming the policy specifies retries and the retry count has been
347 * @param attempt latest attempt number, starting with 1
348 * @return a function to get the next future to execute
350 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
352 return origOutcome -> {
353 // ensure we have a non-null outcome
354 OperationOutcome outcome;
355 if (origOutcome != null) {
356 outcome = origOutcome;
358 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
359 outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE);
362 // ensure correct actor/operation
363 outcome.setActor(getActorName());
364 outcome.setOperation(getName());
366 // determine if we should retry, based on the result
367 if (outcome.getResult() != OperationResult.FAILURE) {
368 // do not retry success or other failure types (e.g., exception)
369 outcome.setFinalOutcome(true);
373 int retry = getRetry(params.getRetry());
375 // no retries were specified
376 outcome.setFinalOutcome(true);
378 } else if (attempt <= retry) {
379 // have more retries - not the final outcome
380 outcome.setFinalOutcome(false);
384 * retries were specified, and we've already tried them all - change to
387 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
388 outcome.setResult(OperationResult.FAILURE_RETRIES);
389 outcome.setFinalOutcome(true);
397 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
398 * was previously invoked, and thus that the "operation" is not {@code null}.
400 * @param controller controller for all of the retries
401 * @param attempt latest attempt number, starting with 1
402 * @return a function to get the next future to execute
404 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
405 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
407 return operation -> {
408 if (!isActorFailed(operation)) {
409 // wrong type or wrong operation - just leave it as is
410 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
411 controller.complete(operation);
412 return new CompletableFuture<>();
415 if (getRetry(params.getRetry()) <= 0) {
416 // no retries - already marked as FAILURE, so just return it
417 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
418 controller.complete(operation);
419 return new CompletableFuture<>();
423 * Retry the operation.
425 long waitMs = getRetryWaitMs();
426 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
428 return sleep(waitMs, TimeUnit.MILLISECONDS)
429 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
434 * Convenience method that starts a sleep(), running via a future.
436 * @param sleepTime time to sleep
437 * @param unit time unit
438 * @return a future that will complete when the sleep completes
440 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
441 if (sleepTime <= 0) {
442 return CompletableFuture.completedFuture(null);
445 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
449 * Converts an exception into an operation outcome, returning a copy of the outcome to
450 * prevent background jobs from changing it.
452 * @param type type of item throwing the exception
453 * @return a function that will convert an exception into an operation outcome
455 private Function<Throwable, OperationOutcome> fromException(String type) {
458 OperationOutcome outcome = makeOutcome();
460 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
461 // do not include exception in the message, as it just clutters the log
462 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
463 params.getRequestId());
465 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
466 params.getRequestId(), thrown);
469 return setOutcome(outcome, thrown);
474 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
475 * any outstanding futures when one completes.
477 * @param futureMakers function to make a future. If the function returns
478 * {@code null}, then no future is created for that function. On the other
479 * hand, if the function throws an exception, then the previously created
480 * functions are canceled and the exception is re-thrown
481 * @return a future to cancel or await an outcome, or {@code null} if no futures were
482 * created. If this future is canceled, then all of the futures will be
485 public CompletableFuture<OperationOutcome> anyOf(
486 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
488 return anyOf(Arrays.asList(futureMakers));
492 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
493 * any outstanding futures when one completes.
495 * @param futureMakers function to make a future. If the function returns
496 * {@code null}, then no future is created for that function. On the other
497 * hand, if the function throws an exception, then the previously created
498 * functions are canceled and the exception is re-thrown
499 * @return a future to cancel or await an outcome, or {@code null} if no futures were
500 * created. If this future is canceled, then all of the futures will be
501 * canceled. Similarly, when this future completes, any incomplete futures
504 public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
506 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
508 CompletableFuture<OperationOutcome>[] futures =
509 attachFutures(controller, futureMakers, UnaryOperator.identity());
511 if (futures.length == 0) {
512 // no futures were started
516 if (futures.length == 1) {
520 CompletableFuture.anyOf(futures).thenApply(OperationOutcome.class::cast)
521 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
527 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
529 * @param futureMakers function to make a future. If the function returns
530 * {@code null}, then no future is created for that function. On the other
531 * hand, if the function throws an exception, then the previously created
532 * functions are canceled and the exception is re-thrown
533 * @return a future to cancel or await an outcome, or {@code null} if no futures were
534 * created. If this future is canceled, then all of the futures will be
537 public CompletableFuture<OperationOutcome> allOf(
538 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
540 return allOf(Arrays.asList(futureMakers));
544 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
546 * @param futureMakers function to make a future. If the function returns
547 * {@code null}, then no future is created for that function. On the other
548 * hand, if the function throws an exception, then the previously created
549 * functions are canceled and the exception is re-thrown
550 * @return a future to cancel or await an outcome, or {@code null} if no futures were
551 * created. If this future is canceled, then all of the futures will be
552 * canceled. Similarly, when this future completes, any incomplete futures
555 public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
556 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
558 Queue<OperationOutcome> outcomes = new LinkedList<>();
560 CompletableFuture<OperationOutcome>[] futures =
561 attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
562 synchronized (outcomes) {
563 outcomes.add(outcome);
568 if (futures.length == 0) {
569 // no futures were started
573 if (futures.length == 1) {
578 CompletableFuture.allOf(futures)
579 .thenApply(unused -> combineOutcomes(outcomes))
580 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
587 * Invokes the functions to create the futures and attaches them to the controller.
589 * @param controller master controller for all of the futures
590 * @param futureMakers futures to be attached to the controller
591 * @param adorn function that "adorns" the future, possible adding onto its pipeline.
592 * Returns the adorned future
593 * @return an array of futures, possibly zero-length. If the array is of size one,
594 * then that one item should be returned instead of the controller
596 @SuppressWarnings("unchecked")
597 private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
598 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
599 UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
601 if (futureMakers.isEmpty()) {
602 return new CompletableFuture[0];
605 // the last, unadorned future that is created
606 CompletableFuture<OperationOutcome> lastFuture = null;
608 List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
611 for (var maker : futureMakers) {
613 CompletableFuture<OperationOutcome> future = maker.get();
614 if (future == null) {
618 // propagate "stop" to the future
619 controller.add(future);
621 futures.add(adorn.apply(future));
625 } catch (RuntimeException e) {
626 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
627 controller.cancel(false);
632 var result = new CompletableFuture[futures.size()];
634 if (result.length == 1) {
635 // special case - return the unadorned future
636 result[0] = lastFuture;
640 return futures.toArray(result);
644 * Combines the outcomes from a set of tasks.
646 * @param outcomes outcomes to be examined
647 * @return the combined outcome
649 private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
651 // identify the outcome with the highest priority
652 OperationOutcome outcome = outcomes.remove();
653 int priority = detmPriority(outcome);
655 for (OperationOutcome outcome2 : outcomes) {
656 int priority2 = detmPriority(outcome2);
658 if (priority2 > priority) {
660 priority = priority2;
664 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
665 (outcome == null ? null : outcome.getResult()), params.getRequestId());
671 * Determines the priority of an outcome based on its result.
673 * @param outcome outcome to examine, or {@code null}
674 * @return the outcome's priority
676 protected int detmPriority(OperationOutcome outcome) {
677 if (outcome == null || outcome.getResult() == null) {
681 switch (outcome.getResult()) {
688 case FAILURE_RETRIES:
694 case FAILURE_TIMEOUT:
697 case FAILURE_EXCEPTION:
704 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
705 * not created until the previous task completes. The pipeline returns the outcome of
706 * the last task executed.
708 * @param futureMakers functions to make the futures
709 * @return a future to cancel the sequence or await the outcome
711 public CompletableFuture<OperationOutcome> sequence(
712 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
714 return sequence(Arrays.asList(futureMakers));
718 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
719 * not created until the previous task completes. The pipeline returns the outcome of
720 * the last task executed.
722 * @param futureMakers functions to make the futures
723 * @return a future to cancel the sequence or await the outcome, or {@code null} if
724 * there were no tasks to perform
726 public CompletableFuture<OperationOutcome> sequence(
727 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
729 Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
731 CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
732 if (nextTask == null) {
737 if (queue.isEmpty()) {
738 // only one task - just return it rather than wrapping it in a controller
743 * multiple tasks - need a controller to stop whichever task is currently
746 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
747 final var executor = params.getExecutor();
750 controller.wrap(nextTask)
751 .thenCompose(nextTaskOnSuccess(controller, queue))
752 .whenCompleteAsync(controller.delayedComplete(), executor);
759 * Executes the next task in the queue, if the previous outcome was successful.
761 * @param controller pipeline controller
762 * @param taskQueue queue of tasks to be performed
763 * @return a future to execute the remaining tasks, or the current outcome, if it's a
764 * failure, or if there are no more tasks
766 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
767 PipelineControllerFuture<OperationOutcome> controller,
768 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
771 if (!isSuccess(outcome)) {
772 // return the failure
773 return CompletableFuture.completedFuture(outcome);
776 CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
777 if (nextTask == null) {
778 // no tasks - just return the success
779 return CompletableFuture.completedFuture(outcome);
785 .thenCompose(nextTaskOnSuccess(controller, taskQueue));
791 * Gets the next task from the queue, skipping those that are {@code null}.
793 * @param taskQueue task queue
794 * @return the next task, or {@code null} if the queue is now empty
796 private CompletableFuture<OperationOutcome> getNextTask(
797 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
799 Supplier<CompletableFuture<OperationOutcome>> maker;
801 while ((maker = taskQueue.poll()) != null) {
802 CompletableFuture<OperationOutcome> future = maker.get();
803 if (future != null) {
812 * Sets the start time of the operation and invokes the callback to indicate that the
813 * operation has started. Does nothing if the pipeline has been stopped.
815 * This assumes that the "outcome" is not {@code null}.
817 * @param callbacks used to determine if the start callback can be invoked
818 * @return a function that sets the start time and invokes the callback
820 protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
822 return (outcome, thrown) -> {
824 if (callbacks.canStart()) {
825 outcome.setSubRequestId(getSubRequestId());
826 outcome.setStart(callbacks.getStartTime());
827 outcome.setEnd(null);
829 // pass a copy to the callback
830 var outcome2 = new OperationOutcome(outcome);
831 outcome2.setFinalOutcome(false);
832 params.callbackStarted(outcome2);
838 * Sets the end time of the operation and invokes the callback to indicate that the
839 * operation has completed. Does nothing if the pipeline has been stopped.
841 * This assumes that the "outcome" is not {@code null}.
843 * Note: the start time must be a reference rather than a plain value, because it's
844 * value must be gotten on-demand, when the returned function is executed at a later
847 * @param callbacks used to determine if the end callback can be invoked
848 * @return a function that sets the end time and invokes the callback
850 protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
852 return (outcome, thrown) -> {
853 if (callbacks.canEnd()) {
854 outcome.setSubRequestId(getSubRequestId());
855 outcome.setStart(callbacks.getStartTime());
856 outcome.setEnd(callbacks.getEndTime());
858 // pass a copy to the callback
859 params.callbackCompleted(new OperationOutcome(outcome));
865 * Sets an operation's outcome and message, based on a throwable.
867 * @param operation operation to be updated
868 * @return the updated operation
870 public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
871 OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT
872 : OperationResult.FAILURE_EXCEPTION);
873 return setOutcome(operation, result);
877 * Sets an operation's outcome and default message based on the result.
879 * @param operation operation to be updated
880 * @param result result of the operation
881 * @return the updated operation
883 public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) {
884 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
885 operation.setResult(result);
886 operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
887 : ControlLoopOperation.FAILED_MSG);
893 * Makes an outcome, populating the "target" field with the contents of the target
896 * @return a new operation outcome
898 protected OperationOutcome makeOutcome() {
899 OperationOutcome outcome = params.makeOutcome();
900 outcome.setTarget(getProperty(OperationProperties.AAI_TARGET_ENTITY));
905 * Determines if a throwable is due to a timeout.
907 * @param thrown throwable of interest
908 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
910 protected boolean isTimeout(Throwable thrown) {
911 if (thrown instanceof CompletionException) {
912 thrown = thrown.getCause();
915 return (thrown instanceof TimeoutException);
919 * Logs a message. If the message is not of type, String, then it attempts to
920 * pretty-print it into JSON before logging.
922 * @param direction IN or OUT
923 * @param infra communication infrastructure on which it was published
924 * @param source source name (e.g., the URL or Topic name)
925 * @param message message to be logged
926 * @return the JSON text that was logged
928 public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
931 json = prettyPrint(message);
933 } catch (IllegalArgumentException e) {
934 String type = (direction == EventType.IN ? "response" : "request");
935 logger.warn("cannot pretty-print {}", type, e);
936 json = message.toString();
939 logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
945 * Converts a message to a "pretty-printed" String using the operation's normal
946 * serialization provider (i.e., it's <i>coder</i>).
948 * @param message response to be logged
949 * @return the JSON text that was logged
950 * @throws IllegalArgumentException if the message cannot be converted
952 public <T> String prettyPrint(T message) {
953 if (message == null) {
955 } else if (message instanceof String) {
956 return message.toString();
959 return getCoder().encode(message, true);
960 } catch (CoderException e) {
961 throw new IllegalArgumentException("cannot encode message", e);
966 // these may be overridden by subclasses or junit tests
969 * Gets the retry count.
971 * @param retry retry, extracted from the parameters, or {@code null}
972 * @return the number of retries, or {@code 0} if no retries were specified
974 protected int getRetry(Integer retry) {
975 return (retry == null ? 0 : retry);
979 * Gets the retry wait, in milliseconds.
981 * @return the retry wait, in milliseconds
983 protected long getRetryWaitMs() {
984 return DEFAULT_RETRY_WAIT_MS;
988 * Gets the operation timeout.
990 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
992 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
995 protected long getTimeoutMs(Integer timeoutSec) {
996 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
999 // these may be overridden by junit tests
1001 protected Coder getCoder() {