2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2022 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.dmi.notifications.async;
23 import com.google.gson.JsonObject;
24 import java.util.HashMap;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Supplier;
29 import lombok.RequiredArgsConstructor;
30 import lombok.extern.slf4j.Slf4j;
31 import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException;
32 import org.onap.cps.ncmp.dmi.model.DataAccessRequest;
33 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
34 import org.springframework.http.HttpStatus;
35 import org.springframework.stereotype.Service;
39 @RequiredArgsConstructor
40 public class AsyncTaskExecutor {
42 private final DmiAsyncRequestResponseEventProducer dmiAsyncRequestResponseEventProducer;
44 private static final DmiAsyncRequestResponseEventCreator dmiAsyncRequestResponseEventCreator =
45 new DmiAsyncRequestResponseEventCreator();
47 private static final Map<DataAccessRequest.OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6);
50 operationToHttpStatusMap.put(null, HttpStatus.OK);
51 operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.READ, HttpStatus.OK);
52 operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.CREATE, HttpStatus.CREATED);
53 operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.PATCH, HttpStatus.OK);
54 operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.UPDATE, HttpStatus.OK);
55 operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.DELETE, HttpStatus.NO_CONTENT);
59 * Execute task asynchronously and publish response to supplied topic.
61 * @param taskSupplier functional method is get() task need to executed asynchronously
62 * @param topicName topic name where message need to be published
63 * @param requestId unique requestId for async request
64 * @param operation the operation performed
65 * @param timeOutInMilliSeconds task timeout in milliseconds
67 public void executeAsyncTask(final Supplier<String> taskSupplier,
68 final String topicName,
69 final String requestId,
70 final DataAccessRequest.OperationEnum operation,
71 final int timeOutInMilliSeconds) {
72 CompletableFuture.supplyAsync(taskSupplier::get)
73 .orTimeout(timeOutInMilliSeconds, TimeUnit.MILLISECONDS)
74 .whenCompleteAsync((resourceDataAsJson, throwable) -> {
75 if (throwable == null) {
76 final String status = operationToHttpStatusMap.get(operation).getReasonPhrase();
77 final String code = String.valueOf(operationToHttpStatusMap.get(operation).value());
78 publishAsyncEvent(topicName, requestId, resourceDataAsJson, status, code);
80 log.error("Error occurred with async request {}", throwable.getMessage());
81 publishAsyncFailureEvent(topicName, requestId, throwable);
84 log.info("Async task completed.");
87 private void publishAsyncEvent(final String topicName,
88 final String requestId,
89 final String resourceDataAsJson,
92 final DmiAsyncRequestResponseEvent cpsAsyncRequestResponseEvent =
93 dmiAsyncRequestResponseEventCreator.createEvent(resourceDataAsJson, topicName, requestId, status, code);
95 dmiAsyncRequestResponseEventProducer.sendMessage(requestId, cpsAsyncRequestResponseEvent);
98 private void publishAsyncFailureEvent(final String topicName,
99 final String requestId,
100 final Throwable throwable) {
101 HttpStatus httpStatus = HttpStatus.INTERNAL_SERVER_ERROR;
103 if (throwable instanceof HttpClientRequestException) {
104 final HttpClientRequestException httpClientRequestException = (HttpClientRequestException) throwable;
105 httpStatus = httpClientRequestException.getHttpStatus();
108 final JsonObject errorDetails = new JsonObject();
109 errorDetails.addProperty("errorDetails", throwable.getMessage());
113 errorDetails.toString(),
114 httpStatus.getReasonPhrase(),
115 String.valueOf(httpStatus.value())