82880106d77871c218c4191243df84eb005a9717
[sdc.git] /
1 /*
2  * Copyright © 2016-2018 European Support Limited
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.openecomp.sdcrests.item.rest.services.catalog.notification;
18
19 import java.util.Collection;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.BiFunction;
25 import org.openecomp.sdc.logging.api.Logger;
26 import org.openecomp.sdc.logging.api.LoggerFactory;
27 import org.openecomp.sdc.logging.api.LoggingContext;
28 import org.openecomp.sdcrests.item.types.ItemAction;
29
30 /**
31  * Asynchronously runs a notification task.
32  *
33  * @author evitaliy
34  * @since 22 Nov 2018
35  */
36 public class AsyncNotifier implements Notifier {
37
38     private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
39
40     private static final int DEFAULT_NUM_OF_RETRIES = 2;
41     private static final long DEFAULT_INTERVAL = 5000;
42
43     private final BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer;
44
45     AsyncNotifier(BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer) {
46         this.taskProducer = taskProducer;
47     }
48
49     @Override
50     public void execute(Collection<String> itemIds, ItemAction action) {
51
52         Callable<AsyncNotifier.NextAction> worker = taskProducer.apply(itemIds, action);
53
54         RetryingTask retryingTask =
55                 new RetryingTask(worker, DEFAULT_NUM_OF_RETRIES, DEFAULT_INTERVAL, EXECUTOR_SERVICE);
56
57         EXECUTOR_SERVICE.submit(LoggingContext.copyToCallable(retryingTask));
58     }
59
60     public enum NextAction {
61         RETRY, DONE
62     }
63
64     static class RetryingTask implements Callable<Void> {
65
66         private static final Logger LOGGER = LoggerFactory.getLogger(RetryingTask.class);
67
68         private final Callable<AsyncNotifier.NextAction> worker;
69         private final long delay;
70         private final ScheduledExecutorService scheduler;
71         private volatile int retries;
72
73         RetryingTask(Callable<AsyncNotifier.NextAction> worker, int numOfRetries, long delay,
74                 ScheduledExecutorService scheduler) {
75
76             this.worker = worker;
77             this.retries = numOfRetries;
78             this.delay = delay;
79             this.scheduler = scheduler;
80         }
81
82         @Override
83         public synchronized Void call() throws Exception {
84
85             NextAction next = worker.call();
86             if (next == NextAction.DONE) {
87                 LOGGER.debug("Task successful: {}. Not going to retry", worker);
88                 return null;
89             }
90
91             retries--;
92             if (retries == 0) {
93                 LOGGER.warn("Exhausted number of retries for task {}, exiting", worker);
94                 return null;
95             }
96
97             scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
98             return null;
99         }
100     }
101 }