80d8fbd0465bce5822cd6d06be4aae26d09b9875
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperatorPartial.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.time.Instant;
24 import java.util.Map;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.CompletionException;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.concurrent.atomic.AtomicReference;
31 import java.util.function.Function;
32 import lombok.Getter;
33 import org.onap.policy.controlloop.ControlLoopOperation;
34 import org.onap.policy.controlloop.actorserviceprovider.Operator;
35 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
36 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
37 import org.onap.policy.controlloop.policy.Policy;
38 import org.onap.policy.controlloop.policy.PolicyResult;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Partial implementation of an operator. Subclasses can choose to simply implement
44  * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override
45  * {@link #doOperationAsFuture(ControlLoopOperationParams)}.
46  */
47 public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
48
49     private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
50
51     private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString();
52     private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString();
53     private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString();
54
55     @Getter
56     private final String actorName;
57
58     @Getter
59     private final String name;
60
61     /**
62      * Constructs the object.
63      *
64      * @param actorName name of the actor with which this operator is associated
65      * @param name operation name
66      */
67     public OperatorPartial(String actorName, String name) {
68         super(actorName + "." + name);
69         this.actorName = actorName;
70         this.name = name;
71     }
72
73     /**
74      * This method does nothing.
75      */
76     @Override
77     protected void doConfigure(Map<String, Object> parameters) {
78         // do nothing
79     }
80
81     /**
82      * This method does nothing.
83      */
84     @Override
85     protected void doStart() {
86         // do nothing
87     }
88
89     /**
90      * This method does nothing.
91      */
92     @Override
93     protected void doStop() {
94         // do nothing
95     }
96
97     /**
98      * This method does nothing.
99      */
100     @Override
101     protected void doShutdown() {
102         // do nothing
103     }
104
105     @Override
106     public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) {
107         if (!isAlive()) {
108             throw new IllegalStateException("operation is not running: " + getFullName());
109         }
110
111         final Executor executor = params.getExecutor();
112
113         // allocate a controller for the entire operation
114         final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
115
116         CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params);
117         if (preproc == null) {
118             // no preprocessor required - just start the operation
119             return startOperationAttempt(params, controller, 1);
120         }
121
122         // propagate "stop" to the preprocessor
123         controller.add(preproc);
124
125         /*
126          * Do preprocessor first and then, if successful, start the operation. Note:
127          * operations create their own outcome, ignoring the outcome from any previous
128          * steps.
129          */
130         preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor)
131                         .thenComposeAsync(handleFailure(params, controller), executor)
132                         .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)),
133                                         executor);
134
135         return controller;
136     }
137
138     /**
139      * Starts an operation's preprocessor step(s). If the preprocessor fails, then it
140      * invokes the started and completed call-backs.
141      *
142      * @param params operation parameters
143      * @return a future that will return the preprocessor outcome, or {@code null} if this
144      *         operation needs no preprocessor
145      */
146     protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) {
147         logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId());
148
149         final Executor executor = params.getExecutor();
150         final ControlLoopOperation operation = params.makeOutcome();
151
152         final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
153                         doPreprocessorAsFuture(params);
154         if (preproc == null) {
155             // no preprocessor required
156             return null;
157         }
158
159         // allocate a controller for the preprocessor steps
160         final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
161
162         /*
163          * Don't mark it complete until we've built the whole pipeline. This will prevent
164          * the operation from starting until after it has been successfully built (i.e.,
165          * without generating any exceptions).
166          */
167         final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
168
169         // @formatter:off
170         firstFuture
171             .thenComposeAsync(controller.add(preproc), executor)
172             .exceptionally(fromException(params, operation))
173             .whenCompleteAsync(controller.delayedComplete(), executor);
174         // @formatter:on
175
176         // start the pipeline
177         firstFuture.complete(operation);
178
179         return controller;
180     }
181
182     /**
183      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
184      * invokes the call-backs and returns a failed outcome. Otherwise, it returns the
185      * outcome that it received.
186      *
187      * @param params operation parameters
188      * @param controller pipeline controller
189      * @return a function that checks the outcome status and continues, if successful, or
190      *         indicates a failure otherwise
191      */
192     private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure(
193                     ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) {
194
195         return outcome -> {
196
197             if (outcome != null && isSuccess(outcome)) {
198                 logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
199                 return CompletableFuture.completedFuture(outcome);
200             }
201
202             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
203
204             final Executor executor = params.getExecutor();
205             final CallbackManager callbacks = new CallbackManager();
206
207             // propagate "stop" to the callbacks
208             controller.add(callbacks);
209
210             final ControlLoopOperation outcome2 = params.makeOutcome();
211
212             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
213
214             outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString());
215             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
216
217             CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor)
218                             .thenApplyAsync(callbackCompleted(params, callbacks), executor)
219                             .whenCompleteAsync(controller.delayedRemove(callbacks), executor)
220                             .whenCompleteAsync(controller.delayedComplete(), executor);
221
222             return controller;
223         };
224     }
225
226     /**
227      * Invokes the operation's preprocessor step(s) as a "future". This method simply
228      * returns {@code null}.
229      * <p/>
230      * This method assumes the following:
231      * <ul>
232      * <li>the operator is alive</li>
233      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
234      * </ul>
235      *
236      * @param params operation parameters
237      * @return a function that will start the preprocessor and returns its outcome, or
238      *         {@code null} if this operation needs no preprocessor
239      */
240     protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
241                     ControlLoopOperationParams params) {
242         return null;
243     }
244
245     /**
246      * Starts the operation attempt, with no preprocessor. When all retries complete, it
247      * will complete the controller.
248      *
249      * @param params operation parameters
250      * @param controller controller for all operation attempts
251      * @param attempt attempt number, typically starting with 1
252      * @return a future that will return the final result of all attempts
253      */
254     private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params,
255                     PipelineControllerFuture<ControlLoopOperation> controller, int attempt) {
256
257         final Executor executor = params.getExecutor();
258
259         CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt);
260
261         // propagate "stop" to the operation attempt
262         controller.add(future);
263
264         // detach when complete
265         future.whenCompleteAsync(controller.delayedRemove(future), executor)
266                         .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor())
267                         .whenCompleteAsync(controller.delayedComplete(), executor);
268
269         return controller;
270     }
271
272     /**
273      * Starts the operation attempt, without doing any retries.
274      *
275      * @param params operation parameters
276      * @param attempt attempt number, typically starting with 1
277      * @return a future that will return the result of a single operation attempt
278      */
279     private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params,
280                     int attempt) {
281
282         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
283
284         final Executor executor = params.getExecutor();
285         final ControlLoopOperation outcome = params.makeOutcome();
286         final CallbackManager callbacks = new CallbackManager();
287
288         // this operation attempt gets its own controller
289         final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
290
291         // propagate "stop" to the callbacks
292         controller.add(callbacks);
293
294         /*
295          * Don't mark it complete until we've built the whole pipeline. This will prevent
296          * the operation from starting until after it has been successfully built (i.e.,
297          * without generating any exceptions).
298          */
299         final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
300
301         // @formatter:off
302         CompletableFuture<ControlLoopOperation> future2 =
303             firstFuture.thenComposeAsync(verifyRunning(controller, params), executor)
304                         .thenApplyAsync(callbackStarted(params, callbacks), executor)
305                         .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor);
306         // @formatter:on
307
308         // handle timeouts, if specified
309         long timeoutMillis = getTimeOutMillis(params.getPolicy());
310         if (timeoutMillis > 0) {
311             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
312             future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
313         }
314
315         /*
316          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
317          * before callbackCompleted() is invoked.
318          *
319          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
320          * the pipeline as the last step anyway.
321          */
322
323         // @formatter:off
324         future2.exceptionally(fromException(params, outcome))
325                     .thenApplyAsync(setRetryFlag(params, attempt), executor)
326                     .thenApplyAsync(callbackStarted(params, callbacks), executor)
327                     .thenApplyAsync(callbackCompleted(params, callbacks), executor)
328                     .whenCompleteAsync(controller.delayedComplete(), executor);
329         // @formatter:on
330
331         // start the pipeline
332         firstFuture.complete(outcome);
333
334         return controller;
335     }
336
337     /**
338      * Determines if the outcome was successful.
339      *
340      * @param outcome outcome to examine
341      * @return {@code true} if the outcome was successful
342      */
343     protected boolean isSuccess(ControlLoopOperation outcome) {
344         return OUTCOME_SUCCESS.equals(outcome.getOutcome());
345     }
346
347     /**
348      * Determines if the outcome was a failure for this operator.
349      *
350      * @param outcome outcome to examine, or {@code null}
351      * @return {@code true} if the outcome is not {@code null} and was a failure
352      *         <i>and</i> was associated with this operator, {@code false} otherwise
353      */
354     protected boolean isActorFailed(ControlLoopOperation outcome) {
355         return OUTCOME_FAILURE.equals(getActorOutcome(outcome));
356     }
357
358     /**
359      * Invokes the operation as a "future". This method simply invokes
360      * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future".
361      * <p/>
362      * This method assumes the following:
363      * <ul>
364      * <li>the operator is alive</li>
365      * <li>verifyRunning() has been invoked</li>
366      * <li>callbackStarted() has been invoked</li>
367      * <li>the invoker will perform appropriate timeout checks</li>
368      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
369      * </ul>
370      *
371      * @param params operation parameters
372      * @param attempt attempt number, typically starting with 1
373      * @return a function that will start the operation and return its result when
374      *         complete
375      */
376     protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
377                     ControlLoopOperationParams params, int attempt) {
378
379         /*
380          * TODO As doOperation() may perform blocking I/O, this should be launched in its
381          * own thread to prevent the ForkJoinPool from being tied up. Should probably
382          * provide a method to make that easy.
383          */
384
385         return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation),
386                         params.getExecutor());
387     }
388
389     /**
390      * Low-level method that performs the operation. This can make the same assumptions
391      * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
392      * method throws an {@link UnsupportedOperationException}.
393      *
394      * @param params operation parameters
395      * @param attempt attempt number, typically starting with 1
396      * @param operation the operation being performed
397      * @return the outcome of the operation
398      */
399     protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
400                     ControlLoopOperation operation) {
401
402         throw new UnsupportedOperationException("start operation " + getFullName());
403     }
404
405     /**
406      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
407      * FAILURE, assuming the policy specifies retries and the retry count has been
408      * exhausted.
409      *
410      * @param params operation parameters
411      * @param attempt latest attempt number, starting with 1
412      * @return a function to get the next future to execute
413      */
414     private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params,
415                     int attempt) {
416
417         return operation -> {
418             if (operation != null && !isActorFailed(operation)) {
419                 /*
420                  * wrong type or wrong operation - just leave it as is. No need to log
421                  * anything here, as retryOnFailure() will log a message
422                  */
423                 return operation;
424             }
425
426             // get a non-null operation
427             ControlLoopOperation oper2;
428             if (operation != null) {
429                 oper2 = operation;
430             } else {
431                 oper2 = params.makeOutcome();
432                 oper2.setOutcome(OUTCOME_FAILURE);
433             }
434
435             if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0
436                             && attempt > params.getPolicy().getRetry()) {
437                 /*
438                  * retries were specified and we've already tried them all - change to
439                  * FAILURE_RETRIES
440                  */
441                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
442                 oper2.setOutcome(OUTCOME_RETRIES);
443             }
444
445             return oper2;
446         };
447     }
448
449     /**
450      * Restarts the operation if it was a FAILURE. Assumes that
451      * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and
452      * thus that the "operation" is not {@code null}.
453      *
454      * @param params operation parameters
455      * @param controller controller for all of the retries
456      * @param attempt latest attempt number, starting with 1
457      * @return a function to get the next future to execute
458      */
459     private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure(
460                     ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller,
461                     int attempt) {
462
463         return operation -> {
464             if (!isActorFailed(operation)) {
465                 // wrong type or wrong operation - just leave it as is
466                 logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
467                 return CompletableFuture.completedFuture(operation);
468             }
469
470             if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) {
471                 // no retries - already marked as FAILURE, so just return it
472                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
473                 return CompletableFuture.completedFuture(operation);
474             }
475
476
477             /*
478              * Retry the operation.
479              */
480             logger.info("retry operation {} for {}", getFullName(), params.getRequestId());
481
482             return startOperationAttempt(params, controller, attempt + 1);
483         };
484     }
485
486     /**
487      * Gets the outcome of an operation for this operation.
488      *
489      * @param operation operation whose outcome is to be extracted
490      * @return the outcome of the given operation, if it's for this operator, {@code null}
491      *         otherwise
492      */
493     protected String getActorOutcome(ControlLoopOperation operation) {
494         if (operation == null) {
495             return null;
496         }
497
498         if (!getActorName().equals(operation.getActor())) {
499             return null;
500         }
501
502         if (!getName().equals(operation.getOperation())) {
503             return null;
504         }
505
506         return operation.getOutcome();
507     }
508
509     /**
510      * Gets a function that will start the next step, if the current operation was
511      * successful, or just return the current operation, otherwise.
512      *
513      * @param params operation parameters
514      * @param nextStep function that will invoke the next step, passing it the operation
515      * @return a function that will start the next step
516      */
517     protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess(
518                     ControlLoopOperationParams params,
519                     Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) {
520
521         return operation -> {
522
523             if (operation == null) {
524                 logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId());
525                 ControlLoopOperation outcome = params.makeOutcome();
526                 outcome.setOutcome(OUTCOME_FAILURE);
527                 return CompletableFuture.completedFuture(outcome);
528
529             } else if (isSuccess(operation)) {
530                 logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId());
531                 return nextStep.apply(operation);
532
533             } else {
534                 logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId());
535                 return CompletableFuture.completedFuture(operation);
536             }
537         };
538     }
539
540     /**
541      * Converts an exception into an operation outcome, returning a copy of the outcome to
542      * prevent background jobs from changing it.
543      *
544      * @param params operation parameters
545      * @param operation current operation
546      * @return a function that will convert an exception into an operation outcome
547      */
548     private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params,
549                     ControlLoopOperation operation) {
550
551         return thrown -> {
552             logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(),
553                             params.getRequestId(), thrown);
554
555             /*
556              * Must make a copy of the operation, as the original could be changed by
557              * background jobs that might still be running.
558              */
559             return setOutcome(params, new ControlLoopOperation(operation), thrown);
560         };
561     }
562
563     /**
564      * Gets a function to verify that the operation is still running. If the pipeline is
565      * not running, then it returns an incomplete future, which will effectively halt
566      * subsequent operations in the pipeline. This method is intended to be used with one
567      * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods.
568      *
569      * @param controller pipeline controller
570      * @param params operation parameters
571      * @return a function to verify that the operation is still running
572      */
573     protected <T> Function<T, CompletableFuture<T>> verifyRunning(
574                     PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) {
575
576         return value -> {
577             boolean running = controller.isRunning();
578             logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId());
579
580             return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>());
581         };
582     }
583
584     /**
585      * Sets the start time of the operation and invokes the callback to indicate that the
586      * operation has started. Does nothing if the pipeline has been stopped.
587      * <p/>
588      * This assumes that the "outcome" is not {@code null}.
589      *
590      * @param params operation parameters
591      * @param callbacks used to determine if the start callback can be invoked
592      * @return a function that sets the start time and invokes the callback
593      */
594     private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params,
595                     CallbackManager callbacks) {
596
597         return outcome -> {
598
599             if (callbacks.canStart()) {
600                 // haven't invoked "start" callback yet
601                 outcome.setStart(callbacks.getStartTime());
602                 outcome.setEnd(null);
603                 params.callbackStarted(outcome);
604             }
605
606             return outcome;
607         };
608     }
609
610     /**
611      * Sets the end time of the operation and invokes the callback to indicate that the
612      * operation has completed. Does nothing if the pipeline has been stopped.
613      * <p/>
614      * This assumes that the "outcome" is not {@code null}.
615      * <p/>
616      * Note: the start time must be a reference rather than a plain value, because it's
617      * value must be gotten on-demand, when the returned function is executed at a later
618      * time.
619      *
620      * @param params operation parameters
621      * @param callbacks used to determine if the end callback can be invoked
622      * @return a function that sets the end time and invokes the callback
623      */
624     private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params,
625                     CallbackManager callbacks) {
626
627         return operation -> {
628
629             if (callbacks.canEnd()) {
630                 operation.setStart(callbacks.getStartTime());
631                 operation.setEnd(callbacks.getEndTime());
632                 params.callbackCompleted(operation);
633             }
634
635             return operation;
636         };
637     }
638
639     /**
640      * Sets an operation's outcome and message, based on a throwable.
641      *
642      * @param params operation parameters
643      * @param operation operation to be updated
644      * @return the updated operation
645      */
646     protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
647                     Throwable thrown) {
648         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
649         return setOutcome(params, operation, result);
650     }
651
652     /**
653      * Sets an operation's outcome and default message based on the result.
654      *
655      * @param params operation parameters
656      * @param operation operation to be updated
657      * @param result result of the operation
658      * @return the updated operation
659      */
660     protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
661                     PolicyResult result) {
662         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
663         operation.setOutcome(result.toString());
664         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
665                         : ControlLoopOperation.FAILED_MSG);
666
667         return operation;
668     }
669
670     /**
671      * Determines if a throwable is due to a timeout.
672      *
673      * @param thrown throwable of interest
674      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
675      */
676     protected boolean isTimeout(Throwable thrown) {
677         if (thrown instanceof CompletionException) {
678             thrown = thrown.getCause();
679         }
680
681         return (thrown instanceof TimeoutException);
682     }
683
684     // these may be overridden by junit tests
685
686     /**
687      * Gets the operation timeout. Subclasses may override this method to obtain the
688      * timeout in some other way (e.g., through configuration properties).
689      *
690      * @param policy policy from which to extract the timeout
691      * @return the operation timeout, in milliseconds
692      */
693     protected long getTimeOutMillis(Policy policy) {
694         Integer timeoutSec = policy.getTimeout();
695         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
696     }
697
698     /**
699      * Manager for "start" and "end" callbacks.
700      */
701     private static class CallbackManager implements Runnable {
702         private final AtomicReference<Instant> startTime = new AtomicReference<>();
703         private final AtomicReference<Instant> endTime = new AtomicReference<>();
704
705         /**
706          * Determines if the "start" callback can be invoked. If so, it sets the
707          * {@link #startTime} to the current time.
708          *
709          * @return {@code true} if the "start" callback can be invoked, {@code false}
710          *         otherwise
711          */
712         public boolean canStart() {
713             return startTime.compareAndSet(null, Instant.now());
714         }
715
716         /**
717          * Determines if the "end" callback can be invoked. If so, it sets the
718          * {@link #endTime} to the current time.
719          *
720          * @return {@code true} if the "end" callback can be invoked, {@code false}
721          *         otherwise
722          */
723         public boolean canEnd() {
724             return endTime.compareAndSet(null, Instant.now());
725         }
726
727         /**
728          * Gets the start time.
729          *
730          * @return the start time, or {@code null} if {@link #canStart()} has not been
731          *         invoked yet.
732          */
733         public Instant getStartTime() {
734             return startTime.get();
735         }
736
737         /**
738          * Gets the end time.
739          *
740          * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
741          *         yet.
742          */
743         public Instant getEndTime() {
744             return endTime.get();
745         }
746
747         /**
748          * Prevents further callbacks from being executed by setting {@link #startTime}
749          * and {@link #endTime}.
750          */
751         @Override
752         public void run() {
753             canStart();
754             canEnd();
755         }
756     }
757 }