[BUG] Make failed async task report failure on Kafka topic
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / utils / data / operation / ResourceDataOperationRequestUtilsSpec.groovy
index 5690b8f..8df27bb 100644 (file)
@@ -26,6 +26,7 @@ import io.cloudevents.kafka.CloudEventDeserializer
 import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.cps.events.EventsPublisher
+import org.onap.cps.ncmp.api.NcmpResponseStatus
 import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
 import org.onap.cps.ncmp.api.impl.inventory.CmHandleState
@@ -38,14 +39,15 @@ import org.onap.cps.utils.JsonObjectMapper
 import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.test.context.ContextConfiguration
+
 import java.time.Duration
+import java.util.concurrent.TimeoutException
 
 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
 
 @ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper])
 class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
 
-    def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
     def static clientTopic = 'my-topic-name'
     def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
 
@@ -90,6 +92,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
 
     def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() {
         given: 'consumer subscribing to client topic'
+            def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer))
             cloudEventKafkaConsumer.subscribe([clientTopic])
         and: 'data operation request having non-ready and non-existing cm handle ids'
             def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
@@ -97,7 +100,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
         when: 'data operation request is processed'
             ResourceDataOperationRequestUtils.processPerDefinitionInDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, yangModelCmHandles)
         and: 'subscribed client specified topic is polled and first record is selected'
-            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
         then: 'verify cloud compliant headers'
             def consumerRecordOutHeaders = consumerRecordOut.headers()
             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') != null
@@ -111,10 +114,34 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
         and: 'data operation response event response size is 3'
             dataOperationResponseEvent.data.responses.size() == 3
         and: 'verify published data operation response as json string'
-        def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
+            def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
             jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
     }
 
+    def 'Publish error response for entire data operations request when async task fails'() {
+        given: 'consumer subscribing to client topic'
+            def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer))
+            cloudEventKafkaConsumer.subscribe([clientTopic])
+        and: 'data operation request having non-ready and non-existing cm handle ids'
+            def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
+            def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
+        when: 'an error occurs for the entire data operations request'
+            ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown)
+        and: 'subscribed client specified topic is polled and first record is selected'
+            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
+            def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
+        then: 'data operation response event response size is 3'
+            dataOperationResponseEvent.data.responses.size() == 3
+        and: 'all 3 have the expected error code'
+            dataOperationResponseEvent.data.responses.each {
+                assert it.statusCode == errorReportedToClientTopic.code
+            }
+        where:
+            scenario             | exceptionThrown        | consumerGroupId || errorReportedToClientTopic
+            'task timed out'     | new TimeoutException() | 'test-2'        || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING
+            'unspecified error'  | new RuntimeException() | 'test-3'        || NcmpResponseStatus.UNKNOWN_ERROR
+    }
+
     static def getYangModelCmHandles() {
         def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
         def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()