2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedList;
28 import java.util.List;
30 import java.util.Queue;
31 import java.util.UUID;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.CompletionException;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.function.BiConsumer;
39 import java.util.function.Function;
40 import java.util.function.Supplier;
41 import java.util.function.UnaryOperator;
42 import lombok.AccessLevel;
45 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
46 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
47 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
48 import org.onap.policy.common.utils.coder.Coder;
49 import org.onap.policy.common.utils.coder.CoderException;
50 import org.onap.policy.common.utils.coder.StandardCoder;
51 import org.onap.policy.controlloop.ControlLoopOperation;
52 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
53 import org.onap.policy.controlloop.actorserviceprovider.Operation;
54 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
59 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
64 * Partial implementation of an operator. In general, it's preferable that subclasses
65 * would override {@link #startOperationAsync(int, OperationOutcome)
66 * startOperationAsync()}. However, if that proves to be too difficult, then they can
67 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}.
69 * The futures returned by the methods within this class can be canceled, and will
70 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
71 * returned by overridden methods will do the same. Of course, if a class overrides
72 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
73 * be done to cancel that particular operation.
75 * In general tasks in a pipeline are executed by the same thread. However, the following
76 * should always be executed via the executor specified in "params":
78 * <li>start callback</li>
79 * <li>completion callback</li>
80 * <li>controller completion (i.e., delayedComplete())</li>
83 public abstract class OperationPartial implements Operation {
84 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
85 private static final Coder coder = new StandardCoder();
87 public static final String GUARD_ACTOR_NAME = "GUARD";
88 public static final String GUARD_OPERATION_NAME = "Decision";
89 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
91 private final OperatorConfig config;
94 * Operation parameters.
96 protected final ControlLoopOperationParams params;
99 private final String fullName;
102 @Setter(AccessLevel.PROTECTED)
103 private String subRequestId;
106 private final List<String> propertyNames;
109 * Values for the properties identified by {@link #getPropertyNames()}.
111 private final Map<String, Object> properties = new HashMap<>();
115 * Constructs the object.
117 * @param params operation parameters
118 * @param config configuration for this operation
119 * @param propertyNames names of properties required by this operation
121 public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
122 this.params = params;
123 this.config = config;
124 this.fullName = params.getActor() + "." + params.getOperation();
125 this.propertyNames = propertyNames;
128 public Executor getBlockingExecutor() {
129 return config.getBlockingExecutor();
133 public String getActorName() {
134 return params.getActor();
138 public String getName() {
139 return params.getOperation();
143 public boolean containsProperty(String name) {
144 return properties.containsKey(name);
148 public void setProperty(String name, Object value) {
149 logger.info("{}: set property {}={}", getFullName(), name, value);
150 properties.put(name, value);
153 @SuppressWarnings("unchecked")
155 public <T> T getProperty(String name) {
156 return (T) properties.get(name);
160 * Gets a property value, throwing an exception if it's missing.
162 * @param name property name
163 * @param propertyType property type, used in an error message if the property value
165 * @return the property value
167 @SuppressWarnings("unchecked")
168 protected <T> T getRequiredProperty(String name, String propertyType) {
169 T value = (T) properties.get(name);
171 throw new IllegalStateException("missing " + propertyType);
178 public CompletableFuture<OperationOutcome> start() {
179 // allocate a controller for the entire operation
180 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
183 return startOperationAttempt(controller, 1);
187 * Starts the operation attempt. When all retries complete, it will complete the
190 * @param controller controller for all operation attempts
191 * @param attempt attempt number, typically starting with 1
192 * @return a future that will return the final result of all attempts
194 private CompletableFuture<OperationOutcome> startOperationAttempt(
195 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
197 generateSubRequestId(attempt);
199 // propagate "stop" to the operation attempt
200 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
201 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
207 * Generates and sets {@link #subRequestId} to a new subrequest ID.
209 * @param attempt attempt number, typically starting with 1
211 public void generateSubRequestId(int attempt) {
212 // Note: this should be "protected", but that makes junits much messier
214 setSubRequestId(UUID.randomUUID().toString());
218 * Starts the operation attempt, without doing any retries.
220 * @param params operation parameters
221 * @param attempt attempt number, typically starting with 1
222 * @return a future that will return the result of a single operation attempt
224 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
226 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
228 final Executor executor = params.getExecutor();
229 final OperationOutcome outcome = makeOutcome();
230 final CallbackManager callbacks = new CallbackManager();
232 // this operation attempt gets its own controller
233 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
235 // propagate "stop" to the callbacks
236 controller.add(callbacks);
239 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
240 .whenCompleteAsync(callbackStarted(callbacks), executor)
241 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
244 // handle timeouts, if specified
245 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
246 if (timeoutMillis > 0) {
247 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
248 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
252 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
253 * before callbackCompleted() is invoked.
255 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
256 * the pipeline as the last step anyway.
260 future.exceptionally(fromException("operation"))
261 .thenApply(setRetryFlag(attempt))
262 .whenCompleteAsync(callbackStarted(callbacks), executor)
263 .whenCompleteAsync(callbackCompleted(callbacks), executor)
264 .whenCompleteAsync(controller.delayedComplete(), executor);
271 * Determines if the outcome was successful.
273 * @param outcome outcome to examine
274 * @return {@code true} if the outcome was successful
276 protected boolean isSuccess(OperationOutcome outcome) {
277 return (outcome != null && outcome.getResult() == OperationResult.SUCCESS);
281 * Determines if the outcome was a failure for this operator.
283 * @param outcome outcome to examine, or {@code null}
284 * @return {@code true} if the outcome is not {@code null} and was a failure
285 * <i>and</i> was associated with this operator, {@code false} otherwise
287 protected boolean isActorFailed(OperationOutcome outcome) {
288 return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE);
292 * Determines if the given outcome is for this operation.
294 * @param outcome outcome to examine
295 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
297 protected boolean isSameOperation(OperationOutcome outcome) {
298 return OperationOutcome.isFor(outcome, getActorName(), getName());
302 * Invokes the operation as a "future". This method simply invokes
303 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
304 * returning the result via a "future".
306 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
307 * the executor in the "params", as that may bring the background thread pool to a
308 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
311 * This method assumes the following:
313 * <li>the operator is alive</li>
314 * <li>verifyRunning() has been invoked</li>
315 * <li>callbackStarted() has been invoked</li>
316 * <li>the invoker will perform appropriate timeout checks</li>
317 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
320 * @param attempt attempt number, typically starting with 1
321 * @return a function that will start the operation and return its result when
324 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
326 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
330 * Low-level method that performs the operation. This can make the same assumptions
331 * that are made by {@link #doOperationAsFuture()}. This particular method simply
332 * throws an {@link UnsupportedOperationException}.
334 * @param attempt attempt number, typically starting with 1
335 * @param operation the operation being performed
336 * @return the outcome of the operation
338 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
340 throw new UnsupportedOperationException("start operation " + getFullName());
344 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
345 * FAILURE, assuming the policy specifies retries and the retry count has been
348 * @param attempt latest attempt number, starting with 1
349 * @return a function to get the next future to execute
351 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
353 return origOutcome -> {
354 // ensure we have a non-null outcome
355 OperationOutcome outcome;
356 if (origOutcome != null) {
357 outcome = origOutcome;
359 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
360 outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE);
363 // ensure correct actor/operation
364 outcome.setActor(getActorName());
365 outcome.setOperation(getName());
367 // determine if we should retry, based on the result
368 if (outcome.getResult() != OperationResult.FAILURE) {
369 // do not retry success or other failure types (e.g., exception)
370 outcome.setFinalOutcome(true);
374 int retry = getRetry(params.getRetry());
376 // no retries were specified
377 outcome.setFinalOutcome(true);
379 } else if (attempt <= retry) {
380 // have more retries - not the final outcome
381 outcome.setFinalOutcome(false);
385 * retries were specified and we've already tried them all - change to
388 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
389 outcome.setResult(OperationResult.FAILURE_RETRIES);
390 outcome.setFinalOutcome(true);
398 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
399 * was previously invoked, and thus that the "operation" is not {@code null}.
401 * @param controller controller for all of the retries
402 * @param attempt latest attempt number, starting with 1
403 * @return a function to get the next future to execute
405 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
406 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
408 return operation -> {
409 if (!isActorFailed(operation)) {
410 // wrong type or wrong operation - just leave it as is
411 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
412 controller.complete(operation);
413 return new CompletableFuture<>();
416 if (getRetry(params.getRetry()) <= 0) {
417 // no retries - already marked as FAILURE, so just return it
418 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
419 controller.complete(operation);
420 return new CompletableFuture<>();
424 * Retry the operation.
426 long waitMs = getRetryWaitMs();
427 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
429 return sleep(waitMs, TimeUnit.MILLISECONDS)
430 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
435 * Convenience method that starts a sleep(), running via a future.
437 * @param sleepTime time to sleep
438 * @param unit time unit
439 * @return a future that will complete when the sleep completes
441 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
442 if (sleepTime <= 0) {
443 return CompletableFuture.completedFuture(null);
446 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
450 * Converts an exception into an operation outcome, returning a copy of the outcome to
451 * prevent background jobs from changing it.
453 * @param type type of item throwing the exception
454 * @return a function that will convert an exception into an operation outcome
456 private Function<Throwable, OperationOutcome> fromException(String type) {
459 OperationOutcome outcome = makeOutcome();
461 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
462 // do not include exception in the message, as it just clutters the log
463 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
464 params.getRequestId());
466 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
467 params.getRequestId(), thrown);
470 return setOutcome(outcome, thrown);
475 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
476 * any outstanding futures when one completes.
478 * @param futureMakers function to make a future. If the function returns
479 * {@code null}, then no future is created for that function. On the other
480 * hand, if the function throws an exception, then the previously created
481 * functions are canceled and the exception is re-thrown
482 * @return a future to cancel or await an outcome, or {@code null} if no futures were
483 * created. If this future is canceled, then all of the futures will be
486 public CompletableFuture<OperationOutcome> anyOf(
487 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
489 return anyOf(Arrays.asList(futureMakers));
493 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
494 * any outstanding futures when one completes.
496 * @param futureMakers function to make a future. If the function returns
497 * {@code null}, then no future is created for that function. On the other
498 * hand, if the function throws an exception, then the previously created
499 * functions are canceled and the exception is re-thrown
500 * @return a future to cancel or await an outcome, or {@code null} if no futures were
501 * created. If this future is canceled, then all of the futures will be
502 * canceled. Similarly, when this future completes, any incomplete futures
505 public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
507 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
509 CompletableFuture<OperationOutcome>[] futures =
510 attachFutures(controller, futureMakers, UnaryOperator.identity());
512 if (futures.length == 0) {
513 // no futures were started
517 if (futures.length == 1) {
521 CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
522 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
528 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
530 * @param futureMakers function to make a future. If the function returns
531 * {@code null}, then no future is created for that function. On the other
532 * hand, if the function throws an exception, then the previously created
533 * functions are canceled and the exception is re-thrown
534 * @return a future to cancel or await an outcome, or {@code null} if no futures were
535 * created. If this future is canceled, then all of the futures will be
538 public CompletableFuture<OperationOutcome> allOf(
539 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
541 return allOf(Arrays.asList(futureMakers));
545 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
547 * @param futureMakers function to make a future. If the function returns
548 * {@code null}, then no future is created for that function. On the other
549 * hand, if the function throws an exception, then the previously created
550 * functions are canceled and the exception is re-thrown
551 * @return a future to cancel or await an outcome, or {@code null} if no futures were
552 * created. If this future is canceled, then all of the futures will be
553 * canceled. Similarly, when this future completes, any incomplete futures
556 public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
557 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
559 Queue<OperationOutcome> outcomes = new LinkedList<>();
561 CompletableFuture<OperationOutcome>[] futures =
562 attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
563 synchronized (outcomes) {
564 outcomes.add(outcome);
569 if (futures.length == 0) {
570 // no futures were started
574 if (futures.length == 1) {
579 CompletableFuture.allOf(futures)
580 .thenApply(unused -> combineOutcomes(outcomes))
581 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
588 * Invokes the functions to create the futures and attaches them to the controller.
590 * @param controller master controller for all of the futures
591 * @param futureMakers futures to be attached to the controller
592 * @param adorn function that "adorns" the future, possible adding onto its pipeline.
593 * Returns the adorned future
594 * @return an array of futures, possibly zero-length. If the array is of size one,
595 * then that one item should be returned instead of the controller
597 private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
598 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
599 UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
601 if (futureMakers.isEmpty()) {
602 @SuppressWarnings("unchecked")
603 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
607 // the last, unadorned future that is created
608 CompletableFuture<OperationOutcome> lastFuture = null;
610 List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
613 for (var maker : futureMakers) {
615 CompletableFuture<OperationOutcome> future = maker.get();
616 if (future == null) {
620 // propagate "stop" to the future
621 controller.add(future);
623 futures.add(adorn.apply(future));
627 } catch (RuntimeException e) {
628 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
629 controller.cancel(false);
634 @SuppressWarnings("unchecked")
635 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
637 if (result.length == 1) {
638 // special case - return the unadorned future
639 result[0] = lastFuture;
643 return futures.toArray(result);
647 * Combines the outcomes from a set of tasks.
649 * @param outcomes outcomes to be examined
650 * @return the combined outcome
652 private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
654 // identify the outcome with the highest priority
655 OperationOutcome outcome = outcomes.remove();
656 int priority = detmPriority(outcome);
658 for (OperationOutcome outcome2 : outcomes) {
659 int priority2 = detmPriority(outcome2);
661 if (priority2 > priority) {
663 priority = priority2;
667 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
668 (outcome == null ? null : outcome.getResult()), params.getRequestId());
674 * Determines the priority of an outcome based on its result.
676 * @param outcome outcome to examine, or {@code null}
677 * @return the outcome's priority
679 protected int detmPriority(OperationOutcome outcome) {
680 if (outcome == null || outcome.getResult() == null) {
684 switch (outcome.getResult()) {
691 case FAILURE_RETRIES:
697 case FAILURE_TIMEOUT:
700 case FAILURE_EXCEPTION:
707 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
708 * not created until the previous task completes. The pipeline returns the outcome of
709 * the last task executed.
711 * @param futureMakers functions to make the futures
712 * @return a future to cancel the sequence or await the outcome
714 public CompletableFuture<OperationOutcome> sequence(
715 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
717 return sequence(Arrays.asList(futureMakers));
721 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
722 * not created until the previous task completes. The pipeline returns the outcome of
723 * the last task executed.
725 * @param futureMakers functions to make the futures
726 * @return a future to cancel the sequence or await the outcome, or {@code null} if
727 * there were no tasks to perform
729 public CompletableFuture<OperationOutcome> sequence(
730 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
732 Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
734 CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
735 if (nextTask == null) {
740 if (queue.isEmpty()) {
741 // only one task - just return it rather than wrapping it in a controller
746 * multiple tasks - need a controller to stop whichever task is currently
749 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
750 final Executor executor = params.getExecutor();
753 controller.wrap(nextTask)
754 .thenCompose(nextTaskOnSuccess(controller, queue))
755 .whenCompleteAsync(controller.delayedComplete(), executor);
762 * Executes the next task in the queue, if the previous outcome was successful.
764 * @param controller pipeline controller
765 * @param taskQueue queue of tasks to be performed
766 * @return a future to execute the remaining tasks, or the current outcome, if it's a
767 * failure, or if there are no more tasks
769 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
770 PipelineControllerFuture<OperationOutcome> controller,
771 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
774 if (!isSuccess(outcome)) {
775 // return the failure
776 return CompletableFuture.completedFuture(outcome);
779 CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
780 if (nextTask == null) {
781 // no tasks - just return the success
782 return CompletableFuture.completedFuture(outcome);
788 .thenCompose(nextTaskOnSuccess(controller, taskQueue));
794 * Gets the next task from the queue, skipping those that are {@code null}.
796 * @param taskQueue task queue
797 * @return the next task, or {@code null} if the queue is now empty
799 private CompletableFuture<OperationOutcome> getNextTask(
800 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
802 Supplier<CompletableFuture<OperationOutcome>> maker;
804 while ((maker = taskQueue.poll()) != null) {
805 CompletableFuture<OperationOutcome> future = maker.get();
806 if (future != null) {
815 * Sets the start time of the operation and invokes the callback to indicate that the
816 * operation has started. Does nothing if the pipeline has been stopped.
818 * This assumes that the "outcome" is not {@code null}.
820 * @param callbacks used to determine if the start callback can be invoked
821 * @return a function that sets the start time and invokes the callback
823 protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
825 return (outcome, thrown) -> {
827 if (callbacks.canStart()) {
828 outcome.setSubRequestId(getSubRequestId());
829 outcome.setStart(callbacks.getStartTime());
830 outcome.setEnd(null);
832 // pass a copy to the callback
833 OperationOutcome outcome2 = new OperationOutcome(outcome);
834 outcome2.setFinalOutcome(false);
835 params.callbackStarted(outcome2);
841 * Sets the end time of the operation and invokes the callback to indicate that the
842 * operation has completed. Does nothing if the pipeline has been stopped.
844 * This assumes that the "outcome" is not {@code null}.
846 * Note: the start time must be a reference rather than a plain value, because it's
847 * value must be gotten on-demand, when the returned function is executed at a later
850 * @param callbacks used to determine if the end callback can be invoked
851 * @return a function that sets the end time and invokes the callback
853 protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
855 return (outcome, thrown) -> {
856 if (callbacks.canEnd()) {
857 outcome.setSubRequestId(getSubRequestId());
858 outcome.setStart(callbacks.getStartTime());
859 outcome.setEnd(callbacks.getEndTime());
861 // pass a copy to the callback
862 params.callbackCompleted(new OperationOutcome(outcome));
868 * Sets an operation's outcome and message, based on a throwable.
870 * @param operation operation to be updated
871 * @return the updated operation
873 public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
874 OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT
875 : OperationResult.FAILURE_EXCEPTION);
876 return setOutcome(operation, result);
880 * Sets an operation's outcome and default message based on the result.
882 * @param operation operation to be updated
883 * @param result result of the operation
884 * @return the updated operation
886 public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) {
887 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
888 operation.setResult(result);
889 operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
890 : ControlLoopOperation.FAILED_MSG);
896 * Makes an outcome, populating the "target" field with the contents of the target
899 * @return a new operation outcome
901 protected OperationOutcome makeOutcome() {
902 return params.makeOutcome(getProperty(OperationProperties.AAI_TARGET_ENTITY));
906 * Determines if a throwable is due to a timeout.
908 * @param thrown throwable of interest
909 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
911 protected boolean isTimeout(Throwable thrown) {
912 if (thrown instanceof CompletionException) {
913 thrown = thrown.getCause();
916 return (thrown instanceof TimeoutException);
920 * Logs a message. If the message is not of type, String, then it attempts to
921 * pretty-print it into JSON before logging.
923 * @param direction IN or OUT
924 * @param infra communication infrastructure on which it was published
925 * @param source source name (e.g., the URL or Topic name)
926 * @param message message to be logged
927 * @return the JSON text that was logged
929 public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
932 json = prettyPrint(message);
934 } catch (IllegalArgumentException e) {
935 String type = (direction == EventType.IN ? "response" : "request");
936 logger.warn("cannot pretty-print {}", type, e);
937 json = message.toString();
940 logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
946 * Converts a message to a "pretty-printed" String using the operation's normal
947 * serialization provider (i.e., it's <i>coder</i>).
949 * @param message response to be logged
950 * @return the JSON text that was logged
951 * @throws IllegalArgumentException if the message cannot be converted
953 public <T> String prettyPrint(T message) {
954 if (message == null) {
956 } else if (message instanceof String) {
957 return message.toString();
960 return getCoder().encode(message, true);
961 } catch (CoderException e) {
962 throw new IllegalArgumentException("cannot encode message", e);
967 // these may be overridden by subclasses or junit tests
970 * Gets the retry count.
972 * @param retry retry, extracted from the parameters, or {@code null}
973 * @return the number of retries, or {@code 0} if no retries were specified
975 protected int getRetry(Integer retry) {
976 return (retry == null ? 0 : retry);
980 * Gets the retry wait, in milliseconds.
982 * @return the retry wait, in milliseconds
984 protected long getRetryWaitMs() {
985 return DEFAULT_RETRY_WAIT_MS;
989 * Gets the operation timeout.
991 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
993 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
996 protected long getTimeoutMs(Integer timeoutSec) {
997 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1000 // these may be overridden by junit tests
1002 protected Coder getCoder() {