2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the 'License');
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.integration.functional.ncmp
23 import io.cloudevents.CloudEvent
24 import io.cloudevents.kafka.CloudEventDeserializer
25 import java.time.Duration
26 import org.apache.kafka.clients.consumer.KafkaConsumer
27 import org.onap.cps.integration.KafkaTestContainer
28 import org.onap.cps.integration.base.CpsIntegrationSpecBase
29 import org.onap.cps.ncmp.events.async1_0_0.Data
30 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
31 import org.onap.cps.ncmp.events.async1_0_0.Response
32 import org.springframework.http.MediaType
33 import spock.util.concurrent.PollingConditions
35 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post
36 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status
38 class LegacyBatchDataOperationSpec extends CpsIntegrationSpecBase {
40 KafkaConsumer kafkaConsumer
43 kafkaConsumer = KafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class)
44 kafkaConsumer.subscribe(['legacy-batch-topic'])
45 kafkaConsumer.poll(Duration.ofMillis(500))
46 dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
47 registerCmHandle(DMI1_URL, 'ch-1', 'tagA', 'alt-1')
48 registerCmHandleWithoutWaitForReady(DMI1_URL, 'not-ready-ch', NO_MODULE_SET_TAG, 'alt-3')
52 deregisterCmHandles(DMI1_URL, ['ch-1', 'not-ready-ch'])
53 kafkaConsumer.unsubscribe()
57 def 'Batch pass-through data operation is forwarded to DMI plugin.'() {
58 given: 'a request body containing a data read operation for an existing and ready CM-handle'
59 def dataOperationRequestBody = makeDataOperationRequestBody('ch-1')
61 when: 'a pass-through batch data request is sent to NCMP is successful'
62 mvc.perform(post('/ncmp/v1/data')
63 .queryParam('topic', 'legacy-batch-topic')
64 .contentType(MediaType.APPLICATION_JSON)
65 .content(dataOperationRequestBody)
66 ).andExpect(status().is2xxSuccessful())
68 then: 'DMI will receive the async request'
69 new PollingConditions().within(2, () -> {
70 assert dmiDispatcher1.receivedDataOperationRequest.isEmpty() == false
72 and: 'the request has one operation'
73 assert dmiDispatcher1.receivedDataOperationRequest.operations.size() == 1
74 def operation = dmiDispatcher1.receivedDataOperationRequest.operations[0]
75 and: 'the operation has the expected ID'
76 assert operation.operationId == '12'
77 and: 'the operation is for the expected CM-handles'
78 assert operation.cmHandles.id == ['ch-1']
81 def 'Batch pass-through data operation reports errors on kafka topic.'() {
82 given: 'a request body containing a data read operation for #cmHandleId'
83 def dataOperationRequestBody = makeDataOperationRequestBody(cmHandleId)
85 when: 'a pass-through batch data request is sent to NCMP specifying a kafka topic is successful'
86 mvc.perform(post('/ncmp/v1/data')
87 .queryParam('topic', 'legacy-batch-topic')
88 .contentType(MediaType.APPLICATION_JSON)
89 .content(dataOperationRequestBody))
90 .andExpect(status().is2xxSuccessful())
92 then: 'there is one kafka message'
93 def consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000))
94 assert consumerRecords.size() == 1
96 and: 'it is a cloud event'
97 assert consumerRecords[0].value() instanceof CloudEvent
99 and: 'it contains the data operation event with the expected error status'
100 def jsonData = new String(consumerRecords[0].value().data.toBytes())
101 def dataOperationEvent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent)
102 assert dataOperationEvent == new DataOperationEvent(data:
103 new Data(responses: [
106 resourceIdentifier: 'ManagedElement=NRNode1/GNBDUFunction=1',
107 options: '(fields=NRCellDU/attributes/cellLocalId)',
109 statusCode: expectedStatusCode,
110 statusMessage: expectedStatusMessage,
115 scenario | cmHandleId || expectedStatusCode | expectedStatusMessage
116 'CM handle not ready' | 'not-ready-ch' || 101 | 'cm handle(s) not ready'
117 // FIXME BUG CPS-2769: CM handle not found causes batch to fail
118 // 'CM handle not found' | 'not-found-ch' || 100 | 'cm handle reference(s) not found'
121 def makeDataOperationRequestBody(cmHandleId) {
128 "datastore": "ncmp-datastore:passthrough-operational",
129 "resourceIdentifier": "ManagedElement=NRNode1/GNBDUFunction=1",
130 "options": "(fields=NRCellDU/attributes/cellLocalId)",
135 """.formatted(cmHandleId)