2 * Copyright © 2016-2018 European Support Limited
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.openecomp.sdcrests.item.rest.services.catalog.notification;
19 import java.util.Collection;
20 import java.util.Objects;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.function.BiFunction;
27 import org.openecomp.sdc.logging.api.Logger;
28 import org.openecomp.sdc.logging.api.LoggerFactory;
29 import org.openecomp.sdc.logging.api.LoggingContext;
30 import org.openecomp.sdcrests.item.types.ItemAction;
33 * Asynchronously runs a notification task.
38 public class AsyncNotifier implements Notifier {
40 private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
42 private static final int DEFAULT_NUM_OF_RETRIES = 2;
43 private static final long DEFAULT_INTERVAL = 5000;
45 private final BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer;
46 private final int numberOfRetries;
47 private final long retryInterval;
50 AsyncNotifier(BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer) {
51 this(taskProducer, DEFAULT_NUM_OF_RETRIES, DEFAULT_INTERVAL);
54 AsyncNotifier(BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer, int numOfRetries,
56 this.taskProducer = taskProducer;
57 this.numberOfRetries = numOfRetries;
58 this.retryInterval = retryInterval;
62 public void execute(Collection<String> itemIds, ItemAction action) {
64 Callable<AsyncNotifier.NextAction> worker = taskProducer.apply(itemIds, action);
66 RetryingTask retryingTask = new RetryingTask(worker, numberOfRetries, retryInterval, EXECUTOR_SERVICE);
68 EXECUTOR_SERVICE.submit(LoggingContext.copyToCallable(retryingTask));
71 public enum NextAction {
75 static class RetryingTask implements Callable<Void> {
77 private static final Logger LOGGER = LoggerFactory.getLogger(RetryingTask.class);
79 private final Callable<AsyncNotifier.NextAction> worker;
80 private final long delay;
81 private final ScheduledExecutorService scheduler;
82 private final AtomicInteger retries;
84 RetryingTask(Callable<AsyncNotifier.NextAction> worker, int numOfRetries, long delay,
85 ScheduledExecutorService scheduler) {
87 this.worker = Objects.requireNonNull(worker);
88 this.retries = new AtomicInteger(requirePositiveRetries(numOfRetries));
89 this.delay = requirePositiveDelay(delay);
90 this.scheduler = Objects.requireNonNull(scheduler);
93 private int requirePositiveRetries(int number) {
96 throw new IllegalArgumentException("Number of retries must be positive");
102 private long requirePositiveDelay(long number) {
105 throw new IllegalArgumentException("Delay must be positive");
112 public Void call() throws Exception {
114 NextAction next = worker.call();
115 if (next == NextAction.DONE) {
116 LOGGER.debug("Task successful: {}. Not going to retry", worker);
120 int attempts = retries.decrementAndGet();
122 LOGGER.warn("Exhausted number of retries for task {}, exiting", worker);
126 scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);