2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the 'License');
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.integration.functional.ncmp.inventory
23 import org.apache.kafka.clients.consumer.KafkaConsumer
24 import org.onap.cps.events.LegacyEvent
25 import org.onap.cps.integration.KafkaTestContainer
26 import org.onap.cps.integration.base.CpsIntegrationSpecBase
27 import org.onap.cps.ncmp.api.NcmpResponseStatus
28 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
29 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
30 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
31 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
32 import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
34 import java.time.Duration
36 class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
38 NetworkCmProxyInventoryFacadeImpl objectUnderTest
40 KafkaConsumer<String, LegacyEvent> kafkaConsumer
44 objectUnderTest = networkCmProxyInventoryFacade
45 subscribeAndClearPreviousMessages('test-group-for-update', 'ncmp-events')
49 kafkaConsumer.unsubscribe()
53 def 'Update of CM-handle with new or unchanged alternate ID succeeds.'() {
54 given: 'DMI will return modules when requested'
55 dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2']]
56 and: "existing CM-handle with alternate ID: $oldAlternateId"
57 registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, oldAlternateId)
59 when: "CM-handle is registered for update with new alternate ID: $newAlternateId"
60 def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: 'ch-1', alternateId: newAlternateId)
61 def dmiPluginRegistrationResponse =
62 objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]))
64 then: 'registration gives successful response'
65 assert dmiPluginRegistrationResponse.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse('ch-1')]
67 and: 'the CM-handle has expected alternate ID'
68 assert objectUnderTest.getNcmpServiceCmHandle('ch-1').alternateId == expectedAlternateId
70 cleanup: 'deregister CM handles'
71 deregisterCmHandle(DMI1_URL, 'ch-1')
74 oldAlternateId | newAlternateId || expectedAlternateId
77 'old' | 'old' || 'old'
83 def 'Update of CM-handle with previously set alternate ID fails.'() {
84 given: 'DMI will return modules when requested'
85 dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2']]
86 and: 'existing CM-handle with alternate ID'
87 registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, 'original')
89 when: 'a CM-handle is registered for update with new alternate ID'
90 def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: 'ch-1', alternateId: 'new')
91 def dmiPluginRegistrationResponse =
92 objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]))
94 then: 'registration gives failure response, due to cm-handle already existing'
95 assert dmiPluginRegistrationResponse.updatedCmHandles == [CmHandleRegistrationResponse.createFailureResponse('ch-1', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST)]
97 and: 'the CM-handle still has the old alternate ID'
98 assert objectUnderTest.getNcmpServiceCmHandle('ch-1').alternateId == 'original'
100 cleanup: 'deregister CM handles'
101 deregisterCmHandle(DMI1_URL, 'ch-1')
104 def 'CM Handle registration to verify changes in data producer identifier'() {
105 given: 'DMI will return modules when requested'
106 def cmHandleId = 'ch-id-for-update'
107 dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
109 when: 'a CM-handle is registered for creation'
111 def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: cmHandleId)
112 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
113 def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
115 then: 'registration gives successful response'
116 assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
118 then: 'the module sync watchdog is triggered'
119 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
121 and: 'flush the latest cm handle registration events( state transition from NONE to ADVISED and ADVISED to READY)'
122 getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2)
124 and: 'cm handle updated with the data producer identifier'
125 def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, dataProducerIdentifier: 'my-data-producer-id')
126 def dmiPluginRegistrationForUpdate = new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate])
127 def dmiPluginRegistrationResponseForUpdate = objectUnderTest.updateDmiRegistration(dmiPluginRegistrationForUpdate)
129 then: 'registration gives successful response'
130 assert dmiPluginRegistrationResponseForUpdate.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
132 and: 'get the latest message'
133 def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1)
135 and: 'the message has the updated data producer identifier'
136 def notificationMessages = []
137 for (def consumerRecord : consumerRecords) {
138 notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent))
140 assert notificationMessages[0].event.cmHandleId.contains(cmHandleId)
141 assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-id'
143 cleanup: 'deregister CM handle'
144 deregisterCmHandle(DMI1_URL, cmHandleId)
147 def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
148 kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId)
149 kafkaConsumer.subscribe([topicName])
150 kafkaConsumer.poll(Duration.ofMillis(500))