2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2022-2023 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.api.impl.inventory.sync.executor;
23 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 import jakarta.annotation.PostConstruct;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.TimeoutException;
30 import java.util.function.Supplier;
32 import lombok.extern.slf4j.Slf4j;
33 import org.springframework.beans.factory.annotation.Value;
34 import org.springframework.stereotype.Service;
38 public class AsyncTaskExecutor {
40 @Value("${ncmp.modules-sync-watchdog.async-executor.parallelism-level:10}")
42 private int asyncTaskParallelismLevel;
43 private ExecutorService executorService;
44 private static final int DEFAULT_PARALLELISM_LEVEL = 10;
47 * Set up executor service with thread-pool size as per configuration parameter.
48 * If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied.
51 public void setupThreadPool() {
52 executorService = Executors.newWorkStealingPool(
53 asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel);
57 * Execute supplied task asynchronously.
59 * @param taskSupplier functional method is get() task need to executed asynchronously
60 * @param timeOutInMillis the task timeout value in milliseconds
62 public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
63 CompletableFuture.supplyAsync(taskSupplier::get, executorService)
64 .orTimeout(timeOutInMillis, MILLISECONDS)
65 .whenCompleteAsync(this::handleTaskCompletion);
68 private void handleTaskCompletion(final Object response, final Throwable throwable) {
69 if (throwable != null) {
70 if (throwable instanceof TimeoutException) {
71 log.warn("Async task didn't completed within the required time.");
73 log.debug("Watchdog async batch failed. caused by : {}", throwable.getMessage());