83f91226be89fa0635dc0f036fd8ca4c6ae704ac
[cps.git] /
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the 'License');
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.integration.functional.ncmp
22
23 import io.cloudevents.CloudEvent
24 import io.cloudevents.kafka.CloudEventDeserializer
25 import java.time.Duration
26 import org.apache.kafka.clients.consumer.KafkaConsumer
27 import org.onap.cps.integration.KafkaTestContainer
28 import org.onap.cps.integration.base.CpsIntegrationSpecBase
29 import org.onap.cps.ncmp.events.async1_0_0.Data
30 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
31 import org.onap.cps.ncmp.events.async1_0_0.Response
32 import org.springframework.http.MediaType
33 import spock.util.concurrent.PollingConditions
34
35 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post
36 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status
37
38 class LegacyBatchDataOperationSpec extends CpsIntegrationSpecBase {
39
40     KafkaConsumer kafkaConsumer
41
42     def setup() {
43         kafkaConsumer = KafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class)
44         kafkaConsumer.subscribe(['legacy-batch-topic'])
45         kafkaConsumer.poll(Duration.ofMillis(500))
46         dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
47         registerCmHandle(DMI1_URL, 'ch-1', 'tagA', 'alt-1')
48         registerCmHandleWithoutWaitForReady(DMI1_URL, 'not-ready-ch', NO_MODULE_SET_TAG, 'alt-3')
49     }
50
51     def cleanup() {
52         deregisterCmHandles(DMI1_URL, ['ch-1', 'not-ready-ch'])
53         kafkaConsumer.unsubscribe()
54         kafkaConsumer.close()
55     }
56
57     def 'Batch pass-through data operation is forwarded to DMI plugin.'() {
58         given: 'a request body containing a data read operation for an existing and ready CM-handle'
59             def dataOperationRequestBody = makeDataOperationRequestBody('ch-1')
60
61         when: 'a pass-through batch data request is sent to NCMP is successful'
62             mvc.perform(post('/ncmp/v1/data')
63                     .queryParam('topic', 'legacy-batch-topic')
64                     .contentType(MediaType.APPLICATION_JSON)
65                     .content(dataOperationRequestBody)
66             ).andExpect(status().is2xxSuccessful())
67
68         then: 'DMI will receive the async request'
69             new PollingConditions().within(2, () -> {
70                 assert dmiDispatcher1.receivedDataOperationRequest.isEmpty() == false
71             })
72         and: 'the request has one operation'
73             assert dmiDispatcher1.receivedDataOperationRequest.operations.size() == 1
74             def operation = dmiDispatcher1.receivedDataOperationRequest.operations[0]
75         and: 'the operation has the expected ID'
76             assert operation.operationId == '12'
77         and: 'the operation is for the expected CM-handles'
78             assert operation.cmHandles.id == ['ch-1']
79     }
80
81     def 'Batch pass-through data operation reports errors on kafka topic.'() {
82         given: 'a request body containing a data read operation for #cmHandleId'
83             def dataOperationRequestBody = makeDataOperationRequestBody(cmHandleId)
84
85         when: 'a pass-through batch data request is sent to NCMP specifying a kafka topic is successful'
86             mvc.perform(post('/ncmp/v1/data')
87                     .queryParam('topic', 'legacy-batch-topic')
88                     .contentType(MediaType.APPLICATION_JSON)
89                     .content(dataOperationRequestBody))
90                     .andExpect(status().is2xxSuccessful())
91
92         then: 'there is one kafka message'
93             def consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000))
94             assert consumerRecords.size() == 1
95
96         and: 'it is a cloud event'
97             assert consumerRecords[0].value() instanceof CloudEvent
98
99         and: 'it contains the data operation event with the expected error status'
100             def jsonData = new String(consumerRecords[0].value().data.toBytes())
101             def dataOperationEvent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent)
102             assert dataOperationEvent == new DataOperationEvent(data:
103                     new Data(responses: [
104                             new Response(
105                                 operationId: 12,
106                                 resourceIdentifier: 'ManagedElement=NRNode1/GNBDUFunction=1',
107                                 options: '(fields=NRCellDU/attributes/cellLocalId)',
108                                 ids: [cmHandleId],
109                                 statusCode: expectedStatusCode,
110                                 statusMessage: expectedStatusMessage,
111                                 result: null),
112                     ]))
113
114         where:
115             scenario              | cmHandleId     || expectedStatusCode | expectedStatusMessage
116             'CM handle not ready' | 'not-ready-ch' || 101                | 'cm handle(s) not ready'
117             // FIXME BUG CPS-2769: CM handle not found causes batch to fail
118             // 'CM handle not found' | 'not-found-ch' || 100                | 'cm handle reference(s) not found'
119     }
120
121     def makeDataOperationRequestBody(cmHandleId) {
122         return """
123                {
124                     "operations": [
125                         {
126                             "operation": "read",
127                             "operationId": "12",
128                             "datastore": "ncmp-datastore:passthrough-operational",
129                             "resourceIdentifier": "ManagedElement=NRNode1/GNBDUFunction=1",
130                             "options": "(fields=NRCellDU/attributes/cellLocalId)",
131                             "targetIds": ["%s"]
132                         }
133                     ]
134                 }
135         """.formatted(cmHandleId)
136     }
137
138 }