2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the 'License');
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.integration.functional.ncmp.inventory
23 import java.time.Duration
24 import org.apache.kafka.clients.consumer.KafkaConsumer
25 import org.apache.kafka.common.serialization.StringDeserializer
26 import org.onap.cps.integration.KafkaTestContainer
27 import org.onap.cps.integration.base.CpsIntegrationSpecBase
28 import org.onap.cps.ncmp.api.NcmpResponseStatus
29 import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
30 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
31 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
32 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
33 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
34 import org.onap.cps.ncmp.api.inventory.models.CmHandleState
35 import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory
37 class CmHandleCreateSpec extends CpsIntegrationSpecBase {
39 NetworkCmProxyInventoryFacadeImpl objectUnderTest
40 def uniqueId = 'ch-unique-id-for-create-test'
42 static KafkaConsumer kafkaConsumer
45 objectUnderTest = networkCmProxyInventoryFacade
46 subscribeAndClearPreviousMessages()
50 kafkaConsumer.unsubscribe()
54 def 'CM Handle registration.'() {
55 given: 'DMI will return modules when requested'
56 dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
57 dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2']
59 when: 'a CM-handle is registered for creation'
60 def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId)
61 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
62 def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
64 then: 'registration gives successful response'
65 assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(uniqueId)]
67 and: 'CM-handle is initially in ADVISED state'
68 assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
70 then: 'the module sync watchdog is triggered'
71 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
73 then: 'CM-handle goes to READY state after module sync'
74 assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
76 and: 'the CM-handle has expected modules'
77 assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
79 then: 'get the latest messages'
80 def consumerRecords = getLatestConsumerRecords()
82 and: 'both converted messages are for the correct cm handle'
83 def notificationMessages = []
84 for (def consumerRecord : consumerRecords) {
85 notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent))
87 assert notificationMessages.event.cmHandleId == [ uniqueId, uniqueId ]
89 and: 'the oldest event is about the update to ADVISED state'
90 notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED'
92 and: 'the next event is about update to READY state'
93 notificationMessages[1].event.newValues.cmHandleState.value() == 'READY'
95 cleanup: 'deregister CM handle'
96 deregisterCmHandle(DMI1_URL, uniqueId)
99 def 'CM Handle goes to LOCKED state when DMI gives error during module sync.'() {
100 given: 'DMI is not available to handle requests'
101 dmiDispatcher1.isAvailable = false
103 when: 'a CM-handle is registered for creation'
104 def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
105 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
106 objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
108 and: 'the module sync watchdog is triggered'
109 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
111 then: 'CM-handle goes to LOCKED state with reason MODULE_SYNC_FAILED'
112 def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState('ch-1')
113 assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED
114 assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_SYNC_FAILED
116 and: 'CM-handle has no modules'
117 assert objectUnderTest.getYangResourcesModuleReferences('ch-1').empty
119 cleanup: 'deregister CM handle'
120 deregisterCmHandle(DMI1_URL, 'ch-1')
123 def 'Create a CM-handle with existing moduleSetTag.'() {
124 given: 'DMI will return modules when requested'
125 dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M3']]
126 and: 'existing CM-handles cm-1 with moduleSetTag "A", and cm-2 with moduleSetTag "B"'
127 registerCmHandle(DMI1_URL, 'ch-1', 'A')
128 registerCmHandle(DMI1_URL, 'ch-2', 'B')
130 when: 'a CM-handle is registered for creation with moduleSetTag "B"'
131 def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-3', moduleSetTag: 'B')
132 objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]))
134 and: 'the module sync watchdog is triggered'
135 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
137 then: 'the CM-handle goes to READY state'
138 assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState('ch-3').cmHandleState
140 and: 'the CM-handle has expected moduleSetTag'
141 assert objectUnderTest.getNcmpServiceCmHandle('ch-3').moduleSetTag == 'B'
143 and: 'the CM-handle has expected modules from module set "B": M1 and M3'
144 assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences('ch-3').moduleName.sort()
146 cleanup: 'deregister CM handles'
147 deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2', 'ch-3'])
150 def 'Create CM-handles with alternate IDs.'() {
151 given: 'DMI will return modules for all CM-handles when requested'
152 dmiDispatcher1.moduleNamesPerCmHandleId = (1..7).collectEntries{ ['ch-'+it, ['M1']] }
153 and: 'an existing CM-handle with an alternate ID'
154 registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, 'existing-alt-id')
155 and: 'an existing CM-handle with no alternate ID'
156 registerCmHandle(DMI1_URL, 'ch-2', NO_MODULE_SET_TAG, NO_ALTERNATE_ID)
158 when: 'a batch of CM-handles is registered for creation with various alternate IDs'
159 def cmHandlesToCreate = [
160 new NcmpServiceCmHandle(cmHandleId: 'ch-3', alternateId: NO_ALTERNATE_ID),
161 new NcmpServiceCmHandle(cmHandleId: 'ch-4', alternateId: 'unique-alt-id'),
162 new NcmpServiceCmHandle(cmHandleId: 'ch-5', alternateId: 'existing-alt-id'),
163 new NcmpServiceCmHandle(cmHandleId: 'ch-6', alternateId: 'duplicate-alt-id'),
164 new NcmpServiceCmHandle(cmHandleId: 'ch-7', alternateId: 'duplicate-alt-id'),
166 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
167 def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
169 then: 'registration gives expected responses'
170 assert dmiPluginRegistrationResponse.createdCmHandles.sort { it.cmHandle } == [
171 CmHandleRegistrationResponse.createSuccessResponse('ch-3'),
172 CmHandleRegistrationResponse.createSuccessResponse('ch-4'),
173 CmHandleRegistrationResponse.createFailureResponse('ch-5', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST),
174 CmHandleRegistrationResponse.createSuccessResponse('ch-6'),
175 CmHandleRegistrationResponse.createFailureResponse('ch-7', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST),
178 cleanup: 'deregister CM handles'
179 deregisterCmHandles(DMI1_URL, (1..7).collect{ 'ch-'+it })
182 def 'CM Handle retry after failed module sync.'() {
183 given: 'DMI is not initially available to handle requests'
184 dmiDispatcher1.isAvailable = false
186 when: 'CM-handles are registered for creation'
187 def cmHandlesToCreate = [new NcmpServiceCmHandle(cmHandleId: 'ch-1'), new NcmpServiceCmHandle(cmHandleId: 'ch-2')]
188 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
189 objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
191 and: 'the module sync watchdog is triggered'
192 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
194 then: 'CM-handles go to LOCKED state'
195 assert objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState == CmHandleState.LOCKED
197 when: 'DMI is available for retry'
198 dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M2']]
199 dmiDispatcher1.isAvailable = true
201 and: 'the module sync watchdog is triggered TWICE'
202 2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
204 then: 'Both CM-handles go to READY state'
205 ['ch-1', 'ch-2'].each { cmHandleId ->
206 assert objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState == CmHandleState.READY
209 and: 'Both CM-handles have expected modules'
210 ['ch-1', 'ch-2'].each { cmHandleId ->
211 assert objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() == ['M1', 'M2']
214 cleanup: 'deregister CM handles'
215 deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2'])
218 def subscribeAndClearPreviousMessages() {
219 kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class)
220 kafkaConsumer.subscribe(['ncmp-events'])
221 kafkaConsumer.poll(Duration.ofMillis(500))
224 def getLatestConsumerRecords() {
225 def consumerRecords = []
226 def retryAttempts = 10
227 while (consumerRecords.size() < 2) {
229 consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100)))
230 if (retryAttempts == 0)
233 return consumerRecords