[BUG] Make failed async task report failure on Kafka topic 23/137723/3
authordanielhanrahan <daniel.hanrahan@est.tech>
Tue, 16 Apr 2024 21:04:44 +0000 (22:04 +0100)
committerdanielhanrahan <daniel.hanrahan@est.tech>
Tue, 23 Apr 2024 09:13:49 +0000 (10:13 +0100)
- In event of async task timeout, error code 102
  (DMI_SERVICE_NOT_RESPONDING) is sent to client topic.
- In event of unexpected error (such as database unavailable),
  error code 108 (UNKNOWN_ERROR) is sent to client topic.
- The default timeouts have been adjusted so that the task
  timeout (60s) is longer than the HTTP and Database timeouts (30s),
  so that expected codes are returned.

Issue-ID: CPS-2186
Signed-off-by: danielhanrahan <daniel.hanrahan@est.tech>
Change-Id: I84c3447a625e084c445ab2f5c01e2b32a0c971ac

cps-application/src/main/resources/application.yml
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
docs/cps-ncmp-message-status-codes.rst
docs/release-notes.rst
integration-test/src/test/resources/application.yml

index 3c263e3..27bc6c6 100644 (file)
@@ -52,7 +52,7 @@ spring:
             minimumIdle: 5
             maximumPoolSize: 80
             idleTimeout: 60000
-            connectionTimeout: 120000
+            connectionTimeout: 30000
             leakDetectionThreshold: 30000
             pool-name: CpsDatabasePool
 
@@ -170,7 +170,7 @@ logging:
 ncmp:
     dmi:
         httpclient:
-            connectionTimeoutInSeconds: 180
+            connectionTimeoutInSeconds: 30
             maximumConnectionsPerRoute: 50
             maximumConnectionsTotal: 100
             idleConnectionEvictionThresholdInSeconds: 5
index 64497b9..1f87865 100644 (file)
@@ -25,11 +25,13 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ;
 
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 import org.onap.cps.ncmp.api.NetworkCmProxyDataService;
 import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException;
 import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
 import org.onap.cps.ncmp.api.impl.operations.OperationType;
+import org.onap.cps.ncmp.api.impl.utils.data.operation.ResourceDataOperationRequestUtils;
 import org.onap.cps.ncmp.api.models.CmResourceAddress;
 import org.onap.cps.ncmp.api.models.DataOperationRequest;
 import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException;
@@ -99,8 +101,9 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
             final DataOperationRequest dataOperationRequest,
             final String authorization) {
         final String requestId = UUID.randomUUID().toString();
-        cpsNcmpTaskExecutor.executeTask(
+        cpsNcmpTaskExecutor.executeTaskWithErrorHandling(
             getTaskSupplierForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId, authorization),
+            getTaskCompletionHandlerForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId),
             timeOutInMilliSeconds);
         return ResponseEntity.ok(Map.of("requestId", requestId));
     }
@@ -139,4 +142,13 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
         };
     }
 
+    private static BiConsumer<Object, Throwable> getTaskCompletionHandlerForDataOperationRequest(
+            final String topicParamInQuery,
+            final DataOperationRequest dataOperationRequest,
+            final String requestId) {
+        return (result, throwable) ->
+                ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(topicParamInQuery,
+                        requestId, dataOperationRequest, throwable);
+    }
+
 }
index ba68d5b..2601c7a 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2023 Nordix Foundation
+ *  Copyright (C) 2022-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ package org.onap.cps.ncmp.rest.executor;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -31,16 +32,30 @@ import org.springframework.stereotype.Service;
 @Service
 public class CpsNcmpTaskExecutor {
 
+    /**
+     * Execute a task asynchronously, and invoke completion handler when done.
+     *
+     * @param taskSupplier functional method is get() task needed to be executed asynchronously
+     * @param taskCompletionHandler the action to perform on task completion or error
+     * @param timeOutInMillis the time-out value in milliseconds
+     */
+    public void executeTaskWithErrorHandling(final Supplier<Object> taskSupplier,
+                                             final BiConsumer<Object, Throwable> taskCompletionHandler,
+                                             final long timeOutInMillis) {
+        CompletableFuture.supplyAsync(taskSupplier)
+                .orTimeout(timeOutInMillis, MILLISECONDS)
+                .whenCompleteAsync(taskCompletionHandler);
+    }
+
     /**
      * Execute a task asynchronously.
      *
-     * @param taskSupplier functional method is get() task need to executed asynchronously
+     * @param taskSupplier functional method is get() task needed to be executed asynchronously
      * @param timeOutInMillis the time-out value in milliseconds
      */
     public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
-        CompletableFuture.supplyAsync(taskSupplier::get)
-            .orTimeout(timeOutInMillis, MILLISECONDS)
-            .whenCompleteAsync((taskResult, throwable) -> handleTaskCompletion(throwable));
+        executeTaskWithErrorHandling(taskSupplier, (taskResult, throwable) -> handleTaskCompletion(throwable),
+                timeOutInMillis);
     }
 
     private void handleTaskCompletion(final Throwable throwable) {
index a5b1f05..2d7e9b2 100644 (file)
@@ -198,7 +198,7 @@ class NetworkCmProxyControllerSpec extends Specification {
         and: 'async request id is generated'
             assert response.contentAsString.contains('requestId')
         then: 'the request is handled asynchronously'
-            1 * mockCpsTaskExecutor.executeTask(*_)
+            1 * mockCpsTaskExecutor.executeTaskWithErrorHandling(*_)
         where: 'the following data stores are used'
             datastore << [PASSTHROUGH_RUNNING, PASSTHROUGH_OPERATIONAL]
     }
index 1585616..641715d 100644 (file)
@@ -79,7 +79,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
         when: 'data operation request is executed'
             objectUnderTest.executeRequest('someTopic', new DataOperationRequest(), NO_AUTH_HEADER)
         then: 'the task is executed in an async fashion or not'
-            expectedCalls * spiedCpsNcmpTaskExecutor.executeTask(*_)
+            expectedCalls * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_)
         where: 'the following parameters are used'
             scenario | notificationFeatureEnabled || expectedCalls
             'on'     | true                       || 1
@@ -101,7 +101,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
         when: 'data operation request is executed'
             objectUnderTest.executeRequest('myTopic', dataOperationRequest, NO_AUTH_HEADER)
         then: 'the task is executed in an async fashion'
-            1 * spiedCpsNcmpTaskExecutor.executeTask(*_)
+            1 * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_)
         and: 'the network service is invoked'
             new PollingConditions().within(1) {
                 assert networkServiceMethodCalled == true
index 010eda9..4c8c40f 100644 (file)
@@ -33,6 +33,7 @@ class CpsNcmpTaskExecutorSpec extends Specification {
     def objectUnderTest = new CpsNcmpTaskExecutor()
     def logger = Spy(ListAppender<ILoggingEvent>)
     def enoughTime = 100
+    def notEnoughTime = 10
 
     void setup() {
         ((Logger) LoggerFactory.getLogger(CpsNcmpTaskExecutor.class)).addAppender(logger)
@@ -67,6 +68,18 @@ class CpsNcmpTaskExecutorSpec extends Specification {
             assert loggingEvent.formattedMessage.contains('original exception message')
     }
 
+    def 'Task times out.'() {
+        when: 'task is executed without enough time to complete'
+            objectUnderTest.executeTask(taskSupplierForLongRunningTask(), notEnoughTime)
+        then: 'an event is logged with level ERROR'
+            new PollingConditions().within(1) {
+                def loggingEvent = getLoggingEvent()
+                assert loggingEvent.level == Level.ERROR
+            }
+        and: 'a timeout error message is logged'
+            assert loggingEvent.formattedMessage.contains('java.util.concurrent.TimeoutException')
+    }
+
     def taskSupplier() {
         return () -> 'hello world'
     }
@@ -75,6 +88,10 @@ class CpsNcmpTaskExecutorSpec extends Specification {
         return () -> { throw new RuntimeException('original exception message') }
     }
 
+    def taskSupplierForLongRunningTask() {
+        return () -> { sleep(enoughTime) }
+    }
+
     def getLoggingEvent() {
         return logger.list[0]
     }
index a8b4e28..4b016b3 100644 (file)
@@ -31,6 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -109,28 +110,80 @@ public class ResourceDataOperationRequestUtils {
                     DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
                     CM_HANDLES_NOT_READY, nonReadyCmHandleIds);
         }
-        if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
-            publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
-        }
+        publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
         return dmiDataOperationsOutPerDmiServiceName;
     }
 
+    /**
+     * Handles the async task completion for an entire data, publishing errors to client topic on task failure.
+     *
+     * @param topicParamInQuery      client given topic
+     * @param requestId              unique identifier per request
+     * @param dataOperationRequest   incoming data operation request details
+     * @param throwable              error cause, or null if task completed with no exception
+     */
+    public static void handleAsyncTaskCompletionForDataOperationsRequest(
+            final String topicParamInQuery,
+            final String requestId,
+            final DataOperationRequest dataOperationRequest,
+            final Throwable throwable) {
+        if (throwable == null) {
+            log.info("Data operations request {} completed.", requestId);
+        } else if (throwable instanceof TimeoutException) {
+            log.error("Data operations request {} timed out.", requestId);
+            ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
+                    requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING);
+        } else {
+            log.error("Data operations request {} failed.", requestId, throwable);
+            ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
+                    requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR);
+        }
+    }
+
+    /**
+     * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic.
+     *
+     * @param topicParamInQuery      client given topic
+     * @param requestId              unique identifier per request
+     * @param dataOperationRequestIn incoming data operation request details
+     * @param ncmpResponseStatus     response code to be sent for all cm handle ids in all operations
+     */
+    private static void publishErrorMessageToClientTopicForEntireOperation(
+            final String topicParamInQuery,
+            final String requestId,
+            final DataOperationRequest dataOperationRequestIn,
+            final NcmpResponseStatus ncmpResponseStatus) {
+
+        final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>>
+                cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>();
+
+        for (final DataOperationDefinition dataOperationDefinitionIn :
+                dataOperationRequestIn.getDataOperationDefinitions()) {
+            cmHandleIdsPerResponseCodesPerOperation.add(
+                    DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
+                    Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds()));
+        }
+        publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
+    }
+
     /**
      * Creates data operation cloud event and publish it to client topic.
      *
      * @param clientTopic                              client given topic
      * @param requestId                                unique identifier per request
-     * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code
+     * @param cmHandleIdsPerResponseCodesPerOperation  list of cm handle ids per operation with response code
      */
     public static void publishErrorMessageToClientTopic(final String clientTopic,
                                                          final String requestId,
                                                          final MultiValueMap<DmiDataOperation,
                                                                  Map<NcmpResponseStatus, List<String>>>
                                                                     cmHandleIdsPerResponseCodesPerOperation) {
-        final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
-                requestId, cmHandleIdsPerResponseCodesPerOperation);
-        final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
-        eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+        if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
+            final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
+                    requestId, cmHandleIdsPerResponseCodesPerOperation);
+            final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
+            eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+        }
     }
 
     private static Map<String, String> getDmiServiceNamesPerCmHandleId(
index 5690b8f..8df27bb 100644 (file)
@@ -26,6 +26,7 @@ import io.cloudevents.kafka.CloudEventDeserializer
 import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.cps.events.EventsPublisher
+import org.onap.cps.ncmp.api.NcmpResponseStatus
 import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
 import org.onap.cps.ncmp.api.impl.inventory.CmHandleState
@@ -38,14 +39,15 @@ import org.onap.cps.utils.JsonObjectMapper
 import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.test.context.ContextConfiguration
+
 import java.time.Duration
+import java.util.concurrent.TimeoutException
 
 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
 
 @ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper])
 class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
 
-    def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
     def static clientTopic = 'my-topic-name'
     def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
 
@@ -90,6 +92,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
 
     def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() {
         given: 'consumer subscribing to client topic'
+            def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer))
             cloudEventKafkaConsumer.subscribe([clientTopic])
         and: 'data operation request having non-ready and non-existing cm handle ids'
             def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
@@ -97,7 +100,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
         when: 'data operation request is processed'
             ResourceDataOperationRequestUtils.processPerDefinitionInDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, yangModelCmHandles)
         and: 'subscribed client specified topic is polled and first record is selected'
-            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
         then: 'verify cloud compliant headers'
             def consumerRecordOutHeaders = consumerRecordOut.headers()
             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') != null
@@ -111,10 +114,34 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
         and: 'data operation response event response size is 3'
             dataOperationResponseEvent.data.responses.size() == 3
         and: 'verify published data operation response as json string'
-        def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
+            def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
             jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
     }
 
+    def 'Publish error response for entire data operations request when async task fails'() {
+        given: 'consumer subscribing to client topic'
+            def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer))
+            cloudEventKafkaConsumer.subscribe([clientTopic])
+        and: 'data operation request having non-ready and non-existing cm handle ids'
+            def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
+            def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
+        when: 'an error occurs for the entire data operations request'
+            ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown)
+        and: 'subscribed client specified topic is polled and first record is selected'
+            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
+            def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
+        then: 'data operation response event response size is 3'
+            dataOperationResponseEvent.data.responses.size() == 3
+        and: 'all 3 have the expected error code'
+            dataOperationResponseEvent.data.responses.each {
+                assert it.statusCode == errorReportedToClientTopic.code
+            }
+        where:
+            scenario             | exceptionThrown        | consumerGroupId || errorReportedToClientTopic
+            'task timed out'     | new TimeoutException() | 'test-2'        || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING
+            'unspecified error'  | new RuntimeException() | 'test-3'        || NcmpResponseStatus.UNKNOWN_ERROR
+    }
+
     static def getYangModelCmHandles() {
         def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
         def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()
index 018cf4a..0c6ce0e 100644 (file)
@@ -16,7 +16,7 @@ CPS-NCMP Message Status Codes
     +-----------------+------------------------------------------------------+-----------------------------------+
     | 1               | successfully applied subscription                    | CM Data Notification Subscription |
     +-----------------+------------------------------------------------------+-----------------------------------+
-    | 100             | cm handle id(s) is(are) not found                    | Data Operation, Inventory         |
+    | 100             | cm handle id(s) is(are) not found                    | All features                      |
     +-----------------+------------------------------------------------------+-----------------------------------+
     | 101             | cm handle(s) not ready                               | Data Operation                    |
     +-----------------+------------------------------------------------------+-----------------------------------+
@@ -32,7 +32,7 @@ CPS-NCMP Message Status Codes
     +-----------------+------------------------------------------------------+-----------------------------------+
     | 107             | southbound system is busy                            | Data Operation                    |
     +-----------------+------------------------------------------------------+-----------------------------------+
-    | 108             | Unknown error                                        | Inventory                         |
+    | 108             | Unknown error                                        | All features                      |
     +-----------------+------------------------------------------------------+-----------------------------------+
     | 109             | cm-handle already exists                             | Inventory                         |
     +-----------------+------------------------------------------------------+-----------------------------------+
index d35ed99..f04f977 100644 (file)
@@ -36,6 +36,11 @@ Release Data
 |                                      |                                                        |
 +--------------------------------------+--------------------------------------------------------+
 
+Bug Fixes
+---------
+3.4.8
+    - `CPS-2186 <https://jira.onap.org/browse/CPS-2186>`_ Report async task failures to client topic during data operations request
+
 Features
 --------
 
index 3d61bdb..6fd3bca 100644 (file)
@@ -48,7 +48,7 @@ spring:
       minimumIdle: 5
       maximumPoolSize: 80
       idleTimeout: 60000
-      connectionTimeout: 120000
+      connectionTimeout: 30000
       leakDetectionThreshold: 30000
       pool-name: CpsDatabasePool
 
@@ -120,7 +120,7 @@ notification:
       queue-capacity: 500
       wait-for-tasks-to-complete-on-shutdown: true
       thread-name-prefix: Async-
-      time-out-value-in-ms: 2000
+      time-out-value-in-ms: 60000
 
 springdoc:
   swagger-ui:
@@ -165,7 +165,7 @@ logging:
 ncmp:
   dmi:
     httpclient:
-      connectionTimeoutInSeconds: 180
+      connectionTimeoutInSeconds: 30
       maximumConnectionsPerRoute: 50
       maximumConnectionsTotal: 100
       idleConnectionEvictionThresholdInSeconds: 5