2 * Copyright © 2016-2018 European Support Limited
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.openecomp.sdcrests.item.rest.services.catalog.notification;
19 import java.util.Collection;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.BiFunction;
25 import org.openecomp.sdc.logging.api.Logger;
26 import org.openecomp.sdc.logging.api.LoggerFactory;
27 import org.openecomp.sdc.logging.api.LoggingContext;
28 import org.openecomp.sdcrests.item.types.ItemAction;
31 * Asynchronously runs a notification task.
36 public class AsyncNotifier implements Notifier {
38 private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
40 private static final int DEFAULT_NUM_OF_RETRIES = 2;
41 private static final long DEFAULT_INTERVAL = 5000;
43 private final BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer;
45 AsyncNotifier(BiFunction<Collection<String>, ItemAction, Callable<NextAction>> taskProducer) {
46 this.taskProducer = taskProducer;
50 public void execute(Collection<String> itemIds, ItemAction action) {
52 Callable<AsyncNotifier.NextAction> worker = taskProducer.apply(itemIds, action);
54 RetryingTask retryingTask =
55 new RetryingTask(worker, DEFAULT_NUM_OF_RETRIES, DEFAULT_INTERVAL, EXECUTOR_SERVICE);
57 EXECUTOR_SERVICE.submit(LoggingContext.copyToCallable(retryingTask));
60 public enum NextAction {
64 static class RetryingTask implements Callable<Void> {
66 private static final Logger LOGGER = LoggerFactory.getLogger(RetryingTask.class);
68 private final Callable<AsyncNotifier.NextAction> worker;
69 private final long delay;
70 private final ScheduledExecutorService scheduler;
71 private volatile int retries;
73 RetryingTask(Callable<AsyncNotifier.NextAction> worker, int numOfRetries, long delay,
74 ScheduledExecutorService scheduler) {
77 this.retries = numOfRetries;
79 this.scheduler = scheduler;
83 public synchronized Void call() throws Exception {
85 NextAction next = worker.call();
86 if (next == NextAction.DONE) {
87 LOGGER.debug("Task successful: {}. Not going to retry", worker);
93 LOGGER.warn("Exhausted number of retries for task {}, exiting", worker);
97 scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);