2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the 'License');
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.integration.functional.ncmp.inventory
23 import org.apache.kafka.clients.consumer.KafkaConsumer
24 import org.onap.cps.events.LegacyEvent
25 import org.onap.cps.integration.KafkaTestContainer
26 import org.onap.cps.integration.base.CpsIntegrationSpecBase
27 import org.onap.cps.ncmp.api.NcmpResponseStatus
28 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
29 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
30 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
31 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
32 import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
34 import java.time.Duration
36 class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
38 NetworkCmProxyInventoryFacadeImpl objectUnderTest
40 KafkaConsumer<String, LegacyEvent> kafkaConsumer
44 objectUnderTest = networkCmProxyInventoryFacade
45 subscribeAndClearPreviousMessages('test-group-for-update', 'ncmp-events')
49 kafkaConsumer.unsubscribe()
53 def 'Update of CM-handle with new or unchanged alternate ID succeeds.'() {
54 given: 'DMI will return modules when requested'
55 dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2']]
56 and: 'existing CM-handle with alternate ID: #oldAlternateId'
57 registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, oldAlternateId)
58 when: 'CM-handle is registered for update with new alternate ID: #newAlternateId'
59 def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: 'ch-1', alternateId: newAlternateId)
60 def dmiPluginRegistrationResponse =
61 objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]))
62 then: 'registration gives successful response'
63 assert dmiPluginRegistrationResponse.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse('ch-1')]
64 and: 'the CM-handle has expected alternate ID'
65 assert objectUnderTest.getNcmpServiceCmHandle('ch-1').alternateId == expectedAlternateId
66 cleanup: 'deregister CM handles'
67 deregisterCmHandle(DMI1_URL, 'ch-1')
68 where: 'following alternate ids are used'
69 oldAlternateId | newAlternateId || expectedAlternateId
72 'old' | 'old' || 'old'
78 def 'Update of CM-handle with previously set alternate ID fails.'() {
79 given: 'DMI will return modules when requested'
80 dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2']]
81 and: 'existing CM-handle with alternate ID'
82 registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, 'original')
83 when: 'a CM-handle is registered for update with new alternate ID'
84 def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: 'ch-1', alternateId: 'new')
85 def dmiPluginRegistrationResponse =
86 objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]))
87 then: 'registration gives failure response, due to cm-handle already existing'
88 assert dmiPluginRegistrationResponse.updatedCmHandles == [CmHandleRegistrationResponse.createFailureResponse('ch-1', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST)]
89 and: 'the CM-handle still has the old alternate ID'
90 assert objectUnderTest.getNcmpServiceCmHandle('ch-1').alternateId == 'original'
91 cleanup: 'deregister CM handles'
92 deregisterCmHandle(DMI1_URL, 'ch-1')
95 def 'CM Handle registration to verify changes in data producer identifier'() {
96 given: 'DMI will return modules when requested'
97 def cmHandleId = 'ch-id-for-update'
98 dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
99 when: 'a CM-handle is registered for creation'
100 def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: cmHandleId)
101 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
102 def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
103 then: 'registration gives successful response'
104 assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
105 then: 'the module sync watchdog is triggered'
106 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
107 and: 'flush and check there are 2 cm handle registration events (state transition from NONE to ADVISED and ADVISED to READY)'
108 assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2).size() == 2
109 and: 'cm handle updated with the data producer identifier'
110 def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, dataProducerIdentifier: 'my-data-producer-id')
111 def dmiPluginRegistrationForUpdate = new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate])
112 def dmiPluginRegistrationResponseForUpdate = objectUnderTest.updateDmiRegistration(dmiPluginRegistrationForUpdate)
113 then: 'registration gives successful response'
114 assert dmiPluginRegistrationResponseForUpdate.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
115 and: 'get the latest message'
116 def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1)
117 and: 'the message has the updated data producer identifier'
118 def notificationMessages = []
119 for (def consumerRecord : consumerRecords) {
120 notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent))
122 assert notificationMessages[0].event.cmHandleId.contains(cmHandleId)
123 assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-id'
124 cleanup: 'deregister CM handle'
125 deregisterCmHandle(DMI1_URL, cmHandleId)
128 def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
129 kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId)
130 kafkaConsumer.subscribe([topicName])
131 kafkaConsumer.poll(Duration.ofMillis(500))