2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024-2026 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the 'License');
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.integration.functional.ncmp.inventory
23 import org.apache.kafka.clients.consumer.KafkaConsumer
24 import org.onap.cps.events.LegacyEvent
25 import org.onap.cps.integration.KafkaTestContainer
26 import org.onap.cps.integration.base.CpsIntegrationSpecBase
27 import org.onap.cps.ncmp.api.NcmpResponseStatus
28 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
29 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
30 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
31 import org.onap.cps.ncmp.events.lcm.LcmEventV1
32 import org.onap.cps.ncmp.events.lcm.LcmEventV2
33 import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
34 import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventProducer
35 import org.springframework.beans.factory.annotation.Autowired
36 import org.springframework.test.util.ReflectionTestUtils
38 import java.time.Duration
40 class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
42 NetworkCmProxyInventoryFacadeImpl objectUnderTest
44 KafkaConsumer<String, LegacyEvent> kafkaConsumer
47 LcmEventProducer lcmEventProducer
50 objectUnderTest = networkCmProxyInventoryFacade
51 subscribeAndClearPreviousMessages('test-group-for-update', 'ncmp-events')
55 kafkaConsumer.unsubscribe()
59 def 'Update of CM-handle with new or unchanged alternate ID succeeds.'() {
60 given: 'DMI will return modules when requested'
61 dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2']]
62 and: 'existing CM-handle with alternate ID: #oldAlternateId'
63 registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, oldAlternateId)
64 when: 'CM-handle is registered for update with new alternate ID: #newAlternateId'
65 def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: 'ch-1', alternateId: newAlternateId)
66 def dmiPluginRegistrationResponse =
67 objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]))
68 then: 'registration gives successful response'
69 assert dmiPluginRegistrationResponse.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse('ch-1')]
70 and: 'the CM-handle has expected alternate ID'
71 assert objectUnderTest.getNcmpServiceCmHandle('ch-1').alternateId == expectedAlternateId
72 cleanup: 'deregister CM handles'
73 deregisterCmHandle(DMI1_URL, 'ch-1')
74 where: 'following alternate ids are used'
75 oldAlternateId | newAlternateId || expectedAlternateId
78 'old' | 'old' || 'old'
84 def 'Update of CM-handle with previously set alternate ID fails.'() {
85 given: 'DMI will return modules when requested'
86 dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2']]
87 and: 'existing CM-handle with alternate ID'
88 registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, 'original')
89 when: 'a CM-handle is registered for update with new alternate ID'
90 def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: 'ch-1', alternateId: 'new')
91 def dmiPluginRegistrationResponse =
92 objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]))
93 then: 'registration gives failure response, due to cm-handle already existing'
94 assert dmiPluginRegistrationResponse.updatedCmHandles == [CmHandleRegistrationResponse.createFailureResponse('ch-1', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST)]
95 and: 'the CM-handle still has the old alternate ID'
96 assert objectUnderTest.getNcmpServiceCmHandle('ch-1').alternateId == 'original'
97 cleanup: 'deregister CM handles'
98 deregisterCmHandle(DMI1_URL, 'ch-1')
101 def 'CM Handle registration to verify changes in data producer identifier'() {
102 given: 'DMI will return modules when requested'
103 def cmHandleId = 'ch-id-for-update'
104 dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
105 when: 'a CM-handle is registered for creation'
106 def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: cmHandleId)
107 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
108 def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
109 then: 'registration gives successful response'
110 assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
111 then: 'the module sync watchdog is triggered'
112 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
113 and: 'flush and check there are 2 cm handle registration events (state transition from NONE to ADVISED and ADVISED to READY)'
114 assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2).size() == 2
115 and: 'cm handle updated with the data producer identifier'
116 def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, dataProducerIdentifier: 'my-data-producer-id')
117 def dmiPluginRegistrationForUpdate = new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate])
118 def dmiPluginRegistrationResponseForUpdate = objectUnderTest.updateDmiRegistration(dmiPluginRegistrationForUpdate)
119 then: 'registration gives successful response'
120 assert dmiPluginRegistrationResponseForUpdate.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
121 and: 'get the latest message'
122 def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1)
123 and: 'the message has the updated data producer identifier'
124 def notificationMessages = []
125 for (def consumerRecord : consumerRecords) {
126 notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEventV1))
128 assert notificationMessages[0].event.cmHandleId.contains(cmHandleId)
129 assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-id'
130 cleanup: 'deregister CM handle'
131 deregisterCmHandle(DMI1_URL, cmHandleId)
134 def 'CM Handle registration to verify changes in data producer identifier using V2 events'() {
135 given: 'event schema version is set to v2'
136 def originalEventSchemaVersion = ReflectionTestUtils.getField(lcmEventProducer, 'eventSchemaVersion')
137 ReflectionTestUtils.setField(lcmEventProducer, 'eventSchemaVersion', 'v2')
138 and: 'DMI will return modules when requested'
139 def cmHandleId = 'ch-id-for-update-v2'
140 dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
141 when: 'a CM-handle is registered for creation'
142 def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: cmHandleId)
143 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
144 def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
145 then: 'registration gives successful response'
146 assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
147 then: 'the module sync watchdog is triggered'
148 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
149 and: 'flush and check there are 2 cm handle registration events (state transition from NONE to ADVISED and ADVISED to READY)'
150 assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2).size() == 2
151 and: 'cm handle updated with the data producer identifier'
152 def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, dataProducerIdentifier: 'my-data-producer-id')
153 def dmiPluginRegistrationForUpdate = new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate])
154 def dmiPluginRegistrationResponseForUpdate = objectUnderTest.updateDmiRegistration(dmiPluginRegistrationForUpdate)
155 then: 'registration gives successful response'
156 assert dmiPluginRegistrationResponseForUpdate.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
157 and: 'get the latest message'
158 def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1)
159 and: 'the V2 message has the updated data producer identifier in newValues'
160 def notificationMessages = []
161 for (def consumerRecord : consumerRecords) {
162 notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEventV2))
164 assert notificationMessages[0].event.cmHandleId.contains(cmHandleId)
165 assert notificationMessages[0].event.newValues['dataProducerIdentifier'] == 'my-data-producer-id'
166 cleanup: 'restore original event schema version and deregister CM handle'
167 ReflectionTestUtils.setField(lcmEventProducer, 'eventSchemaVersion', originalEventSchemaVersion)
168 deregisterCmHandle(DMI1_URL, cmHandleId)
171 def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
172 kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId)
173 kafkaConsumer.subscribe([topicName])
174 kafkaConsumer.poll(Duration.ofMillis(500))