Add integration tests of legacy batch data operation 10/140710/3
authordanielhanrahan <daniel.hanrahan@est.tech>
Wed, 16 Apr 2025 11:20:58 +0000 (12:20 +0100)
committerdanielhanrahan <daniel.hanrahan@est.tech>
Wed, 23 Apr 2025 09:46:35 +0000 (10:46 +0100)
Add integration tests for:
1. happy path: batch request forwarded to DMI
2. negative case: not ready CM-handles report error
3. negative case: not existing CM-handles report error

Case 3 is Ignored as there is currently a bug which causes the
request to fail, instead of reporting error on kafka topic.

Issue-ID: CPS-2769
Signed-off-by: danielhanrahan <daniel.hanrahan@est.tech>
Change-Id: Ic31bf856c09dd63ba80019fc251a437458184bba

integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/LegacyBatchDataOperationSpec.groovy [new file with mode: 0644]

index 11c0ff2..556495e 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2024-2025 Nordix Foundation
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the 'License');
  *  you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
 package org.onap.cps.integration.base
 
 import groovy.json.JsonSlurper
+import java.util.regex.Matcher
 import okhttp3.mockwebserver.Dispatcher
 import okhttp3.mockwebserver.MockResponse
 import okhttp3.mockwebserver.RecordedRequest
@@ -28,9 +29,6 @@ import org.springframework.http.HttpHeaders
 import org.springframework.http.HttpStatus
 import org.springframework.http.MediaType
 
-import java.util.regex.Matcher
-import java.util.stream.Collectors
-
 import static org.onap.cps.integration.base.CpsIntegrationSpecBase.readResourceDataFile
 
 /**
@@ -60,6 +58,7 @@ class DmiDispatcher extends Dispatcher {
     def jsonSlurper = new JsonSlurper()
     def moduleNamesPerCmHandleId = [:]
     def receivedSubJobs = [:]
+    def receivedDataOperationRequest = [:]
     def lastAuthHeaderReceived
     def dmiResourceDataUrl
 
@@ -90,8 +89,9 @@ class DmiDispatcher extends Dispatcher {
                 return mockResponseWithBody(HttpStatus.OK, '{}')
 
             // legacy pass-through batch data operation
-            case ~'^/dmi/v1/data$':
-                return mockResponseWithBody(HttpStatus.ACCEPTED, '{}')
+            case ~'^/dmi/v1/data\\?requestId=(.*)&topic=(.*)$':
+                receivedDataOperationRequest = jsonSlurper.parseText(request.body.readUtf8())
+                return mockResponse(HttpStatus.ACCEPTED)
 
             // get data job status
             case ~'^/dmi/v1/cmwriteJob/dataProducer/(.*)/dataProducerJob/(.*)/status$':
@@ -112,7 +112,7 @@ class DmiDispatcher extends Dispatcher {
 
     def mockWriteJobResponse(request) {
         def destination = Matcher.lastMatcher[0][1]
-        def subJobWriteRequest = jsonSlurper.parseText(request.getBody().readUtf8())
+        def subJobWriteRequest = jsonSlurper.parseText(request.body.readUtf8())
         this.receivedSubJobs.put(destination, subJobWriteRequest)
         def response = '{"subJobId":"some sub job id"}'
         return mockResponseWithBody(HttpStatus.OK, response)
@@ -126,8 +126,8 @@ class DmiDispatcher extends Dispatcher {
     }
 
     def getModuleResourcesResponse(request, cmHandleId) {
-        def moduleResourcesRequest = jsonSlurper.parseText(request.getBody().readUtf8())
-        def requestedModuleNames = moduleResourcesRequest.get('data').get('modules').collect{it.get('name')}
+        def moduleResourcesRequest = jsonSlurper.parseText(request.body.readUtf8())
+        def requestedModuleNames = moduleResourcesRequest.data.modules.name
         def candidateModuleNames = getModuleNamesForCmHandle(cmHandleId)
         def moduleNames = candidateModuleNames.stream().filter(candidate -> requestedModuleNames.contains(candidate)).toList()
 
index 41798cb..d3a5c9a 100644 (file)
@@ -46,7 +46,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
         subscribeAndClearPreviousMessages()
     }
 
-    def cleanupSpec() {
+    def cleanup() {
         kafkaConsumer.unsubscribe()
         kafkaConsumer.close()
     }
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/LegacyBatchDataOperationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/LegacyBatchDataOperationSpec.groovy
new file mode 100644 (file)
index 0000000..83f9122
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the 'License');
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.integration.functional.ncmp
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import java.time.Duration
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.cps.integration.KafkaTestContainer
+import org.onap.cps.integration.base.CpsIntegrationSpecBase
+import org.onap.cps.ncmp.events.async1_0_0.Data
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
+import org.onap.cps.ncmp.events.async1_0_0.Response
+import org.springframework.http.MediaType
+import spock.util.concurrent.PollingConditions
+
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status
+
+class LegacyBatchDataOperationSpec extends CpsIntegrationSpecBase {
+
+    KafkaConsumer kafkaConsumer
+
+    def setup() {
+        kafkaConsumer = KafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class)
+        kafkaConsumer.subscribe(['legacy-batch-topic'])
+        kafkaConsumer.poll(Duration.ofMillis(500))
+        dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
+        registerCmHandle(DMI1_URL, 'ch-1', 'tagA', 'alt-1')
+        registerCmHandleWithoutWaitForReady(DMI1_URL, 'not-ready-ch', NO_MODULE_SET_TAG, 'alt-3')
+    }
+
+    def cleanup() {
+        deregisterCmHandles(DMI1_URL, ['ch-1', 'not-ready-ch'])
+        kafkaConsumer.unsubscribe()
+        kafkaConsumer.close()
+    }
+
+    def 'Batch pass-through data operation is forwarded to DMI plugin.'() {
+        given: 'a request body containing a data read operation for an existing and ready CM-handle'
+            def dataOperationRequestBody = makeDataOperationRequestBody('ch-1')
+
+        when: 'a pass-through batch data request is sent to NCMP is successful'
+            mvc.perform(post('/ncmp/v1/data')
+                    .queryParam('topic', 'legacy-batch-topic')
+                    .contentType(MediaType.APPLICATION_JSON)
+                    .content(dataOperationRequestBody)
+            ).andExpect(status().is2xxSuccessful())
+
+        then: 'DMI will receive the async request'
+            new PollingConditions().within(2, () -> {
+                assert dmiDispatcher1.receivedDataOperationRequest.isEmpty() == false
+            })
+        and: 'the request has one operation'
+            assert dmiDispatcher1.receivedDataOperationRequest.operations.size() == 1
+            def operation = dmiDispatcher1.receivedDataOperationRequest.operations[0]
+        and: 'the operation has the expected ID'
+            assert operation.operationId == '12'
+        and: 'the operation is for the expected CM-handles'
+            assert operation.cmHandles.id == ['ch-1']
+    }
+
+    def 'Batch pass-through data operation reports errors on kafka topic.'() {
+        given: 'a request body containing a data read operation for #cmHandleId'
+            def dataOperationRequestBody = makeDataOperationRequestBody(cmHandleId)
+
+        when: 'a pass-through batch data request is sent to NCMP specifying a kafka topic is successful'
+            mvc.perform(post('/ncmp/v1/data')
+                    .queryParam('topic', 'legacy-batch-topic')
+                    .contentType(MediaType.APPLICATION_JSON)
+                    .content(dataOperationRequestBody))
+                    .andExpect(status().is2xxSuccessful())
+
+        then: 'there is one kafka message'
+            def consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000))
+            assert consumerRecords.size() == 1
+
+        and: 'it is a cloud event'
+            assert consumerRecords[0].value() instanceof CloudEvent
+
+        and: 'it contains the data operation event with the expected error status'
+            def jsonData = new String(consumerRecords[0].value().data.toBytes())
+            def dataOperationEvent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent)
+            assert dataOperationEvent == new DataOperationEvent(data:
+                    new Data(responses: [
+                            new Response(
+                                operationId: 12,
+                                resourceIdentifier: 'ManagedElement=NRNode1/GNBDUFunction=1',
+                                options: '(fields=NRCellDU/attributes/cellLocalId)',
+                                ids: [cmHandleId],
+                                statusCode: expectedStatusCode,
+                                statusMessage: expectedStatusMessage,
+                                result: null),
+                    ]))
+
+        where:
+            scenario              | cmHandleId     || expectedStatusCode | expectedStatusMessage
+            'CM handle not ready' | 'not-ready-ch' || 101                | 'cm handle(s) not ready'
+            // FIXME BUG CPS-2769: CM handle not found causes batch to fail
+            // 'CM handle not found' | 'not-found-ch' || 100                | 'cm handle reference(s) not found'
+    }
+
+    def makeDataOperationRequestBody(cmHandleId) {
+        return """
+               {
+                    "operations": [
+                        {
+                            "operation": "read",
+                            "operationId": "12",
+                            "datastore": "ncmp-datastore:passthrough-operational",
+                            "resourceIdentifier": "ManagedElement=NRNode1/GNBDUFunction=1",
+                            "options": "(fields=NRCellDU/attributes/cellLocalId)",
+                            "targetIds": ["%s"]
+                        }
+                    ]
+                }
+        """.formatted(cmHandleId)
+    }
+
+}