2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.CompletionException;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import java.util.function.BiConsumer;
30 import java.util.function.Function;
31 import org.onap.policy.controlloop.ControlLoopOperation;
32 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
33 import org.onap.policy.controlloop.actorserviceprovider.Operation;
34 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
35 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
36 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
37 import org.onap.policy.controlloop.policy.PolicyResult;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * Partial implementation of an operator. In general, it's preferable that subclasses
43 * would override {@link #startOperationAsync(int, OperationOutcome)
44 * startOperationAsync()}. However, if that proves to be too difficult, then they can
45 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
46 * if the operation requires any preprocessor steps, the subclass may choose to override
47 * {@link #startPreprocessorAsync()}.
49 * The futures returned by the methods within this class can be canceled, and will
50 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
51 * returned by overridden methods will do the same. Of course, if a class overrides
52 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
53 * be done to cancel that particular operation.
55 public abstract class OperationPartial implements Operation {
57 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
58 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
60 // values extracted from the operator
62 private final OperatorPartial operator;
65 * Operation parameters.
67 protected final ControlLoopOperationParams params;
71 * Constructs the object.
73 * @param params operation parameters
74 * @param operator operator that created this operation
76 public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) {
78 this.operator = operator;
81 public Executor getBlockingExecutor() {
82 return operator.getBlockingExecutor();
85 public String getFullName() {
86 return operator.getFullName();
89 public String getActorName() {
90 return operator.getActorName();
93 public String getName() {
94 return operator.getName();
98 public final CompletableFuture<OperationOutcome> start() {
99 if (!operator.isAlive()) {
100 throw new IllegalStateException("operation is not running: " + getFullName());
103 // allocate a controller for the entire operation
104 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
106 CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
107 if (preproc == null) {
108 // no preprocessor required - just start the operation
109 return startOperationAttempt(controller, 1);
113 * Do preprocessor first and then, if successful, start the operation. Note:
114 * operations create their own outcome, ignoring the outcome from any previous
117 * Wrap the preprocessor to ensure "stop" is propagated to it.
120 controller.wrap(preproc)
121 .exceptionally(fromException("preprocessor of operation"))
122 .thenCompose(handlePreprocessorFailure(controller))
123 .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
124 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
131 * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
132 * invokes the call-backs, marks the controller complete, and returns an incomplete
133 * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
136 * Assumes that no callbacks have been invoked yet.
138 * @param controller pipeline controller
139 * @return a function that checks the outcome status and continues, if successful, or
140 * indicates a failure otherwise
142 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
143 PipelineControllerFuture<OperationOutcome> controller) {
147 if (outcome != null && isSuccess(outcome)) {
148 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
149 return CompletableFuture.completedFuture(outcome);
152 logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
154 final Executor executor = params.getExecutor();
155 final CallbackManager callbacks = new CallbackManager();
157 // propagate "stop" to the callbacks
158 controller.add(callbacks);
160 final OperationOutcome outcome2 = params.makeOutcome();
162 // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
164 outcome2.setResult(PolicyResult.FAILURE_GUARD);
165 outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
168 CompletableFuture.completedFuture(outcome2)
169 .whenCompleteAsync(callbackStarted(callbacks), executor)
170 .whenCompleteAsync(callbackCompleted(callbacks), executor)
171 .whenCompleteAsync(controller.delayedComplete(), executor);
174 return new CompletableFuture<>();
179 * Invokes the operation's preprocessor step(s) as a "future". This method simply
180 * invokes {@link #startGuardAsync()}.
182 * This method assumes the following:
184 * <li>the operator is alive</li>
185 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
188 * @return a function that will start the preprocessor and returns its outcome, or
189 * {@code null} if this operation needs no preprocessor
191 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
192 return startGuardAsync();
196 * Invokes the operation's guard step(s) as a "future". This method simply returns
199 * This method assumes the following:
201 * <li>the operator is alive</li>
202 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
205 * @return a function that will start the guard checks and returns its outcome, or
206 * {@code null} if this operation has no guard
208 protected CompletableFuture<OperationOutcome> startGuardAsync() {
213 * Starts the operation attempt, with no preprocessor. When all retries complete, it
214 * will complete the controller.
216 * @param controller controller for all operation attempts
217 * @param attempt attempt number, typically starting with 1
218 * @return a future that will return the final result of all attempts
220 private CompletableFuture<OperationOutcome> startOperationAttempt(
221 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
223 // propagate "stop" to the operation attempt
224 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
225 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
231 * Starts the operation attempt, without doing any retries.
233 * @param params operation parameters
234 * @param attempt attempt number, typically starting with 1
235 * @return a future that will return the result of a single operation attempt
237 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
239 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
241 final Executor executor = params.getExecutor();
242 final OperationOutcome outcome = params.makeOutcome();
243 final CallbackManager callbacks = new CallbackManager();
245 // this operation attempt gets its own controller
246 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
248 // propagate "stop" to the callbacks
249 controller.add(callbacks);
252 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
253 .whenCompleteAsync(callbackStarted(callbacks), executor)
254 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
257 // handle timeouts, if specified
258 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
259 if (timeoutMillis > 0) {
260 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
261 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
265 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
266 * before callbackCompleted() is invoked.
268 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
269 * the pipeline as the last step anyway.
273 future.exceptionally(fromException("operation"))
274 .thenApply(setRetryFlag(attempt))
275 .whenCompleteAsync(callbackStarted(callbacks), executor)
276 .whenCompleteAsync(callbackCompleted(callbacks), executor)
277 .whenCompleteAsync(controller.delayedComplete(), executor);
284 * Determines if the outcome was successful.
286 * @param outcome outcome to examine
287 * @return {@code true} if the outcome was successful
289 protected boolean isSuccess(OperationOutcome outcome) {
290 return (outcome.getResult() == PolicyResult.SUCCESS);
294 * Determines if the outcome was a failure for this operator.
296 * @param outcome outcome to examine, or {@code null}
297 * @return {@code true} if the outcome is not {@code null} and was a failure
298 * <i>and</i> was associated with this operator, {@code false} otherwise
300 protected boolean isActorFailed(OperationOutcome outcome) {
301 return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
305 * Determines if the given outcome is for this operation.
307 * @param outcome outcome to examine
308 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
310 protected boolean isSameOperation(OperationOutcome outcome) {
311 return OperationOutcome.isFor(outcome, getActorName(), getName());
315 * Invokes the operation as a "future". This method simply invokes
316 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
317 * returning the result via a "future".
319 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
320 * the executor in the "params", as that may bring the background thread pool to a
321 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
324 * This method assumes the following:
326 * <li>the operator is alive</li>
327 * <li>verifyRunning() has been invoked</li>
328 * <li>callbackStarted() has been invoked</li>
329 * <li>the invoker will perform appropriate timeout checks</li>
330 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
333 * @param attempt attempt number, typically starting with 1
334 * @return a function that will start the operation and return its result when
337 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
339 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
343 * Low-level method that performs the operation. This can make the same assumptions
344 * that are made by {@link #doOperationAsFuture()}. This particular method simply
345 * throws an {@link UnsupportedOperationException}.
347 * @param attempt attempt number, typically starting with 1
348 * @param operation the operation being performed
349 * @return the outcome of the operation
351 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
353 throw new UnsupportedOperationException("start operation " + getFullName());
357 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
358 * FAILURE, assuming the policy specifies retries and the retry count has been
361 * @param attempt latest attempt number, starting with 1
362 * @return a function to get the next future to execute
364 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
366 return operation -> {
367 if (operation != null && !isActorFailed(operation)) {
369 * wrong type or wrong operation - just leave it as is. No need to log
370 * anything here, as retryOnFailure() will log a message
375 // get a non-null operation
376 OperationOutcome oper2;
377 if (operation != null) {
380 oper2 = params.makeOutcome();
381 oper2.setResult(PolicyResult.FAILURE);
384 int retry = getRetry(params.getRetry());
385 if (retry > 0 && attempt > retry) {
387 * retries were specified and we've already tried them all - change to
390 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
391 oper2.setResult(PolicyResult.FAILURE_RETRIES);
399 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
400 * was previously invoked, and thus that the "operation" is not {@code null}.
402 * @param controller controller for all of the retries
403 * @param attempt latest attempt number, starting with 1
404 * @return a function to get the next future to execute
406 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
407 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
409 return operation -> {
410 if (!isActorFailed(operation)) {
411 // wrong type or wrong operation - just leave it as is
412 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
413 controller.complete(operation);
414 return new CompletableFuture<>();
417 if (getRetry(params.getRetry()) <= 0) {
418 // no retries - already marked as FAILURE, so just return it
419 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
420 controller.complete(operation);
421 return new CompletableFuture<>();
425 * Retry the operation.
427 long waitMs = getRetryWaitMs();
428 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
430 return sleep(waitMs, TimeUnit.MILLISECONDS)
431 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
436 * Convenience method that starts a sleep(), running via a future.
438 * @param sleepTime time to sleep
439 * @param unit time unit
440 * @return a future that will complete when the sleep completes
442 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
443 if (sleepTime <= 0) {
444 return CompletableFuture.completedFuture(null);
447 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
451 * Converts an exception into an operation outcome, returning a copy of the outcome to
452 * prevent background jobs from changing it.
454 * @param type type of item throwing the exception
455 * @return a function that will convert an exception into an operation outcome
457 private Function<Throwable, OperationOutcome> fromException(String type) {
460 OperationOutcome outcome = params.makeOutcome();
462 logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
463 params.getRequestId(), thrown);
465 return setOutcome(outcome, thrown);
470 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
471 * any outstanding futures when one completes.
473 * @param futures futures for which to wait
474 * @return a future to cancel or await an outcome. If this future is canceled, then
475 * all of the futures will be canceled
477 protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) {
479 // convert list to an array
480 @SuppressWarnings("rawtypes")
481 CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
483 @SuppressWarnings("unchecked")
484 CompletableFuture<OperationOutcome> result = anyOf(arrFutures);
489 * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
490 * outstanding futures when one completes.
492 * @param futures futures for which to wait
493 * @return a future to cancel or await an outcome. If this future is canceled, then
494 * all of the futures will be canceled
496 protected CompletableFuture<OperationOutcome> anyOf(
497 @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
499 if (futures.length == 1) {
503 final Executor executor = params.getExecutor();
504 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
506 attachFutures(controller, futures);
509 CompletableFuture.anyOf(futures)
510 .thenApply(object -> (OperationOutcome) object)
511 .whenCompleteAsync(controller.delayedComplete(), executor);
518 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
519 * the futures if returned future is canceled. The future returns the "worst" outcome,
520 * based on priority (see {@link #detmPriority(OperationOutcome)}).
522 * @param futures futures for which to wait
523 * @return a future to cancel or await an outcome. If this future is canceled, then
524 * all of the futures will be canceled
526 protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) {
528 // convert list to an array
529 @SuppressWarnings("rawtypes")
530 CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
532 @SuppressWarnings("unchecked")
533 CompletableFuture<OperationOutcome> result = allOf(arrFutures);
538 * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
539 * futures if returned future is canceled. The future returns the "worst" outcome,
540 * based on priority (see {@link #detmPriority(OperationOutcome)}).
542 * @param futures futures for which to wait
543 * @return a future to cancel or await an outcome. If this future is canceled, then
544 * all of the futures will be canceled
546 protected CompletableFuture<OperationOutcome> allOf(
547 @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
549 if (futures.length == 1) {
553 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
555 attachFutures(controller, futures);
557 OperationOutcome[] outcomes = new OperationOutcome[futures.length];
559 @SuppressWarnings("rawtypes")
560 CompletableFuture[] futures2 = new CompletableFuture[futures.length];
562 // record the outcomes of each future when it completes
563 for (int count = 0; count < futures2.length; ++count) {
564 final int count2 = count;
565 futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
569 CompletableFuture.allOf(futures2)
570 .thenApply(unused -> combineOutcomes(outcomes))
571 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
578 * Attaches the given futures to the controller.
580 * @param controller master controller for all of the futures
581 * @param futures futures to be attached to the controller
583 private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
584 @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
586 if (futures.length == 0) {
587 throw new IllegalArgumentException("empty list of futures");
591 for (CompletableFuture<OperationOutcome> future : futures) {
592 controller.add(future);
597 * Combines the outcomes from a set of tasks.
599 * @param outcomes outcomes to be examined
600 * @return the combined outcome
602 private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) {
604 // identify the outcome with the highest priority
605 OperationOutcome outcome = outcomes[0];
606 int priority = detmPriority(outcome);
608 // start with "1", as we've already dealt with "0"
609 for (int count = 1; count < outcomes.length; ++count) {
610 OperationOutcome outcome2 = outcomes[count];
611 int priority2 = detmPriority(outcome2);
613 if (priority2 > priority) {
615 priority = priority2;
619 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
620 (outcome == null ? null : outcome.getResult()), params.getRequestId());
626 * Determines the priority of an outcome based on its result.
628 * @param outcome outcome to examine, or {@code null}
629 * @return the outcome's priority
631 protected int detmPriority(OperationOutcome outcome) {
632 if (outcome == null || outcome.getResult() == null) {
636 switch (outcome.getResult()) {
643 case FAILURE_RETRIES:
649 case FAILURE_TIMEOUT:
652 case FAILURE_EXCEPTION:
659 * Performs a task, after verifying that the controller is still running. Also checks
660 * that the previous outcome was successful, if specified.
662 * @param controller overall pipeline controller
663 * @param checkSuccess {@code true} to check the previous outcome, {@code false}
665 * @param outcome outcome of the previous task
666 * @param task task to be performed
667 * @return the task, if everything checks out. Otherwise, it returns an incomplete
668 * future and completes the controller instead
671 protected CompletableFuture<OperationOutcome> doTask(
672 PipelineControllerFuture<OperationOutcome> controller,
673 boolean checkSuccess, OperationOutcome outcome,
674 CompletableFuture<OperationOutcome> task) {
677 if (checkSuccess && !isSuccess(outcome)) {
679 * must complete before canceling so that cancel() doesn't cause controller to
682 controller.complete(outcome);
684 return new CompletableFuture<>();
687 return controller.wrap(task);
691 * Performs a task, after verifying that the controller is still running. Also checks
692 * that the previous outcome was successful, if specified.
694 * @param controller overall pipeline controller
695 * @param checkSuccess {@code true} to check the previous outcome, {@code false}
697 * @param task function to start the task to be performed
698 * @return a function to perform the task. If everything checks out, then it returns
699 * the task. Otherwise, it returns an incomplete future and completes the
703 protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(
704 PipelineControllerFuture<OperationOutcome> controller,
705 boolean checkSuccess,
706 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
711 if (!controller.isRunning()) {
712 return new CompletableFuture<>();
715 if (checkSuccess && !isSuccess(outcome)) {
716 controller.complete(outcome);
717 return new CompletableFuture<>();
720 return controller.wrap(task.apply(outcome));
725 * Sets the start time of the operation and invokes the callback to indicate that the
726 * operation has started. Does nothing if the pipeline has been stopped.
728 * This assumes that the "outcome" is not {@code null}.
730 * @param callbacks used to determine if the start callback can be invoked
731 * @return a function that sets the start time and invokes the callback
733 private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
735 return (outcome, thrown) -> {
737 if (callbacks.canStart()) {
738 // haven't invoked "start" callback yet
739 outcome.setStart(callbacks.getStartTime());
740 outcome.setEnd(null);
741 params.callbackStarted(outcome);
747 * Sets the end time of the operation and invokes the callback to indicate that the
748 * operation has completed. Does nothing if the pipeline has been stopped.
750 * This assumes that the "outcome" is not {@code null}.
752 * Note: the start time must be a reference rather than a plain value, because it's
753 * value must be gotten on-demand, when the returned function is executed at a later
756 * @param callbacks used to determine if the end callback can be invoked
757 * @return a function that sets the end time and invokes the callback
759 private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
761 return (outcome, thrown) -> {
763 if (callbacks.canEnd()) {
764 outcome.setStart(callbacks.getStartTime());
765 outcome.setEnd(callbacks.getEndTime());
766 params.callbackCompleted(outcome);
772 * Sets an operation's outcome and message, based on a throwable.
774 * @param operation operation to be updated
775 * @return the updated operation
777 protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
778 PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
779 return setOutcome(operation, result);
783 * Sets an operation's outcome and default message based on the result.
785 * @param operation operation to be updated
786 * @param result result of the operation
787 * @return the updated operation
789 public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
790 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
791 operation.setResult(result);
792 operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
793 : ControlLoopOperation.FAILED_MSG);
799 * Determines if a throwable is due to a timeout.
801 * @param thrown throwable of interest
802 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
804 protected boolean isTimeout(Throwable thrown) {
805 if (thrown instanceof CompletionException) {
806 thrown = thrown.getCause();
809 return (thrown instanceof TimeoutException);
812 // these may be overridden by subclasses or junit tests
815 * Gets the retry count.
817 * @param retry retry, extracted from the parameters, or {@code null}
818 * @return the number of retries, or {@code 0} if no retries were specified
820 protected int getRetry(Integer retry) {
821 return (retry == null ? 0 : retry);
825 * Gets the retry wait, in milliseconds.
827 * @return the retry wait, in milliseconds
829 protected long getRetryWaitMs() {
830 return DEFAULT_RETRY_WAIT_MS;
834 * Gets the operation timeout.
836 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
838 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
841 protected long getTimeoutMs(Integer timeoutSec) {
842 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));