[BUG] Make failed async task report failure on Kafka topic
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / utils / data / operation / ResourceDataOperationRequestUtils.java
index a8b4e28..4b016b3 100644 (file)
@@ -31,6 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -109,28 +110,80 @@ public class ResourceDataOperationRequestUtils {
                     DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
                     CM_HANDLES_NOT_READY, nonReadyCmHandleIds);
         }
-        if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
-            publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
-        }
+        publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
         return dmiDataOperationsOutPerDmiServiceName;
     }
 
+    /**
+     * Handles the async task completion for an entire data, publishing errors to client topic on task failure.
+     *
+     * @param topicParamInQuery      client given topic
+     * @param requestId              unique identifier per request
+     * @param dataOperationRequest   incoming data operation request details
+     * @param throwable              error cause, or null if task completed with no exception
+     */
+    public static void handleAsyncTaskCompletionForDataOperationsRequest(
+            final String topicParamInQuery,
+            final String requestId,
+            final DataOperationRequest dataOperationRequest,
+            final Throwable throwable) {
+        if (throwable == null) {
+            log.info("Data operations request {} completed.", requestId);
+        } else if (throwable instanceof TimeoutException) {
+            log.error("Data operations request {} timed out.", requestId);
+            ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
+                    requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING);
+        } else {
+            log.error("Data operations request {} failed.", requestId, throwable);
+            ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
+                    requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR);
+        }
+    }
+
+    /**
+     * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic.
+     *
+     * @param topicParamInQuery      client given topic
+     * @param requestId              unique identifier per request
+     * @param dataOperationRequestIn incoming data operation request details
+     * @param ncmpResponseStatus     response code to be sent for all cm handle ids in all operations
+     */
+    private static void publishErrorMessageToClientTopicForEntireOperation(
+            final String topicParamInQuery,
+            final String requestId,
+            final DataOperationRequest dataOperationRequestIn,
+            final NcmpResponseStatus ncmpResponseStatus) {
+
+        final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>>
+                cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>();
+
+        for (final DataOperationDefinition dataOperationDefinitionIn :
+                dataOperationRequestIn.getDataOperationDefinitions()) {
+            cmHandleIdsPerResponseCodesPerOperation.add(
+                    DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
+                    Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds()));
+        }
+        publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
+    }
+
     /**
      * Creates data operation cloud event and publish it to client topic.
      *
      * @param clientTopic                              client given topic
      * @param requestId                                unique identifier per request
-     * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code
+     * @param cmHandleIdsPerResponseCodesPerOperation  list of cm handle ids per operation with response code
      */
     public static void publishErrorMessageToClientTopic(final String clientTopic,
                                                          final String requestId,
                                                          final MultiValueMap<DmiDataOperation,
                                                                  Map<NcmpResponseStatus, List<String>>>
                                                                     cmHandleIdsPerResponseCodesPerOperation) {
-        final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
-                requestId, cmHandleIdsPerResponseCodesPerOperation);
-        final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
-        eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+        if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
+            final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
+                    requestId, cmHandleIdsPerResponseCodesPerOperation);
+            final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
+            eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+        }
     }
 
     private static Map<String, String> getDmiServiceNamesPerCmHandleId(