872c61e480943d3675906f256be3c5f497a70968
[sdc.git] /
1 /*
2  * Copyright © 2016-2018 European Support Limited
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.openecomp.sdcrests.item.rest.services.catalog.notification;
18
19 import java.util.Collection;
20 import java.util.Objects;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.function.BiFunction;
27 import org.openecomp.sdc.logging.api.Logger;
28 import org.openecomp.sdc.logging.api.LoggerFactory;
29 import org.openecomp.sdc.logging.api.LoggingContext;
30 import org.openecomp.sdcrests.item.types.ItemAction;
31
32 /**
33  * Asynchronously runs a notification task.
34  *
35  * @author evitaliy
36  * @since 22 Nov 2018
37  */
38 public class AsyncNotifier implements Notifier {
39
40     private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
41
42     private static final int DEFAULT_NUM_OF_RETRIES = 2;
43     private static final long DEFAULT_INTERVAL = 5000;
44
45     private final BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer;
46     private final int numberOfRetries;
47     private final long retryInterval;
48
49
50     AsyncNotifier(BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer) {
51         this(taskProducer, DEFAULT_NUM_OF_RETRIES, DEFAULT_INTERVAL);
52     }
53
54     AsyncNotifier(BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer, int numOfRetries,
55             long retryInterval) {
56         this.taskProducer = taskProducer;
57         this.numberOfRetries = numOfRetries;
58         this.retryInterval = retryInterval;
59     }
60
61     @Override
62     public void execute(Collection<String> itemIds, ItemAction action) {
63
64         Callable<AsyncNotifier.NextAction> worker = taskProducer.apply(itemIds, action);
65
66         RetryingTask retryingTask = new RetryingTask(worker, numberOfRetries, retryInterval, EXECUTOR_SERVICE);
67
68         EXECUTOR_SERVICE.submit(LoggingContext.copyToCallable(retryingTask));
69     }
70
71     public enum NextAction {
72         RETRY, DONE
73     }
74
75     static class RetryingTask implements Callable<Void> {
76
77         private static final Logger LOGGER = LoggerFactory.getLogger(RetryingTask.class);
78
79         private final Callable<AsyncNotifier.NextAction> worker;
80         private final long delay;
81         private final ScheduledExecutorService scheduler;
82         private final AtomicInteger retries;
83
84         RetryingTask(Callable<AsyncNotifier.NextAction> worker, int numOfRetries, long delay,
85                 ScheduledExecutorService scheduler) {
86
87             this.worker = Objects.requireNonNull(worker);
88             this.retries = new AtomicInteger(requirePositiveRetries(numOfRetries));
89             this.delay = requirePositiveDelay(delay);
90             this.scheduler = Objects.requireNonNull(scheduler);
91         }
92
93         private int requirePositiveRetries(int number) {
94
95             if (number < 1) {
96                 throw new IllegalArgumentException("Number of retries must be positive");
97             }
98
99             return number;
100         }
101
102         private long requirePositiveDelay(long number) {
103
104             if (number < 1) {
105                 throw new IllegalArgumentException("Delay must be positive");
106             }
107
108             return number;
109         }
110
111         @Override
112         public Void call() throws Exception {
113
114             NextAction next = worker.call();
115             if (next == NextAction.DONE) {
116                 LOGGER.debug("Task successful: {}. Not going to retry", worker);
117                 return null;
118             }
119
120             int attempts = retries.decrementAndGet();
121             if (attempts < 1) {
122                 LOGGER.warn("Exhausted number of retries for task {}, exiting", worker);
123                 return null;
124             }
125
126             scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
127             return null;
128         }
129     }
130 }