2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the 'License');
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.integration.functional.ncmp.inventory
24 import org.apache.kafka.clients.consumer.KafkaConsumer
25 import org.onap.cps.events.LegacyEvent
26 import org.onap.cps.integration.KafkaTestContainer
27 import org.onap.cps.integration.base.CpsIntegrationSpecBase
28 import org.onap.cps.ncmp.api.NcmpResponseStatus
29 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
30 import org.onap.cps.ncmp.api.inventory.models.CmHandleState
31 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
32 import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory
33 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
34 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
35 import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
36 import spock.util.concurrent.PollingConditions
38 import java.time.Duration
40 class CmHandleCreateSpec extends CpsIntegrationSpecBase {
42 NetworkCmProxyInventoryFacadeImpl objectUnderTest
43 def uniqueId = 'my-new-cm-handle'
45 KafkaConsumer<String, LegacyEvent> kafkaConsumer
48 objectUnderTest = networkCmProxyInventoryFacade
49 subscribeAndClearPreviousMessages('test-group', 'ncmp-events')
50 clearPreviousInstrumentation()
54 kafkaConsumer.unsubscribe()
58 def 'CM Handle registration.'() {
59 given: 'DMI will return modules when requested'
60 dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
61 dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2']
62 when: 'a CM-handle is registered for creation'
63 def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId,
66 dataProducerIdentifier: 'prod1',
67 publicProperties: [color:'green'])
68 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
69 def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
70 then: 'registration gives successful response'
71 assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(uniqueId)]
72 and: 'CM-handle is initially in ADVISED state'
73 assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
74 then: 'the module sync watchdog is triggered'
75 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
76 then: 'CM-handle goes to READY state after module sync'
77 assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
78 and: 'the CM-handle has expected modules'
79 assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
80 then: 'get the last 2 messages and related headers'
81 def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2)
84 consumerRecords.each { consumerRecord ->
85 messages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent))
86 headerMaps.add(getHeadersAsMap(consumerRecord))
88 and: 'both messages have the correct common attributes (that did not change)'
89 assert messages.event.every {
90 it.cmHandleId == uniqueId &&
91 it.alternateId == 'fdn1' &&
92 it.moduleSetTag == 'tag1' &&
93 it.dataProducerIdentifier == 'prod1'
95 and: 'the header fields are populated correctly and stored as kafka headers too'
96 validateEventHeaders(messages[0], headerMaps[0], 'create')
97 validateEventHeaders(messages[1], headerMaps[1], 'update')
98 and: 'the first lcm event has no old values and the initial attributes as new values state ADVISED'
99 with(messages[0].event) {
100 assert oldValues == null
101 assert newValues.cmHandleState.value() == 'ADVISED'
102 assert newValues.dataSyncEnabled == null
103 assert newValues.cmHandleProperties[0] == [color:'green']
105 and: 'the next event is about update to READY state (new value), the old value for state is ADVISED'
106 assert messages[1].event.oldValues.cmHandleState.value() == 'ADVISED'
107 assert messages[1].event.newValues.cmHandleState.value() == 'READY'
108 and: 'the cm handle (public) properties have not changed and are therefore null for old and new values'
109 assert messages[1].event.oldValues.cmHandleProperties == null
110 assert messages[1].event.newValues.cmHandleProperties == null
111 and: 'the data sync flag goes from undefined to false'
112 assert messages[1].event.oldValues.dataSyncEnabled == null
113 assert messages[1].event.newValues.dataSyncEnabled == false
114 and: 'there are no more messages to be read'
115 assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1).size() == 0
116 and: 'instrumentation has recorded 2 events (null > ADVISED, ADVISED > READY)'
117 new PollingConditions().within(5) {
118 assert countLcmEventTimerInvocations() == 2
120 then: 'deregister CM handle'
121 deregisterCmHandle(DMI1_URL, uniqueId)
122 and: 'instrumentation has recorded 2 more events (READY > DELETING, DELETING > null)'
123 new PollingConditions().within(5) {
124 assert countLcmEventTimerInvocations() == 2 + 2
128 def 'CM Handle registration with DMI error during module sync.'() {
129 given: 'DMI is not available to handle requests'
130 dmiDispatcher1.isAvailable = false
131 when: 'a CM-handle is registered for creation'
132 def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
133 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
134 objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
135 and: 'the module sync watchdog is triggered'
136 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
137 then: 'CM-handle goes to LOCKED state with reason MODULE_SYNC_FAILED'
138 def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState('ch-1')
139 assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED
140 assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_SYNC_FAILED
141 and: 'CM-handle has no modules'
142 assert objectUnderTest.getYangResourcesModuleReferences('ch-1').empty
143 cleanup: 'deregister CM handle'
144 deregisterCmHandle(DMI1_URL, 'ch-1')
147 def 'Create a CM-handle with existing moduleSetTag.'() {
148 given: 'DMI will return modules when requested'
149 dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M3']]
150 and: 'existing CM-handles cm-1 with moduleSetTag "A", and cm-2 with moduleSetTag "B"'
151 registerCmHandle(DMI1_URL, 'ch-1', 'A')
152 registerCmHandle(DMI1_URL, 'ch-2', 'B')
153 when: 'a CM-handle is registered for creation with moduleSetTag "B"'
154 def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-3', moduleSetTag: 'B')
155 objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]))
156 and: 'the module sync watchdog is triggered'
157 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
158 then: 'the CM-handle goes to READY state'
159 assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState('ch-3').cmHandleState
160 and: 'the CM-handle has expected moduleSetTag'
161 assert objectUnderTest.getNcmpServiceCmHandle('ch-3').moduleSetTag == 'B'
162 and: 'the CM-handle has expected modules from module set "B": M1 and M3'
163 assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences('ch-3').moduleName.sort()
164 cleanup: 'deregister CM handles'
165 deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2', 'ch-3'])
168 def 'Create CM-handles with alternate IDs.'() {
169 given: 'DMI will return modules for all CM-handles when requested'
170 dmiDispatcher1.moduleNamesPerCmHandleId = (1..7).collectEntries{ ['ch-'+it, ['M1']] }
171 and: 'an existing CM-handle with an alternate ID'
172 registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, 'existing-alt-id')
173 and: 'an existing CM-handle with no alternate ID'
174 registerCmHandle(DMI1_URL, 'ch-2', NO_MODULE_SET_TAG, NO_ALTERNATE_ID)
175 when: 'a batch of CM-handles is registered for creation with various alternate IDs'
176 def cmHandlesToCreate = [
177 new NcmpServiceCmHandle(cmHandleId: 'ch-3', alternateId: NO_ALTERNATE_ID),
178 new NcmpServiceCmHandle(cmHandleId: 'ch-4', alternateId: 'unique-alt-id'),
179 new NcmpServiceCmHandle(cmHandleId: 'ch-5', alternateId: 'existing-alt-id'),
180 new NcmpServiceCmHandle(cmHandleId: 'ch-6', alternateId: 'duplicate-alt-id'),
181 new NcmpServiceCmHandle(cmHandleId: 'ch-7', alternateId: 'duplicate-alt-id'),
183 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
184 def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
185 then: 'registration gives expected responses'
186 assert dmiPluginRegistrationResponse.createdCmHandles.sort { it.cmHandle } == [
187 CmHandleRegistrationResponse.createSuccessResponse('ch-3'),
188 CmHandleRegistrationResponse.createSuccessResponse('ch-4'),
189 CmHandleRegistrationResponse.createFailureResponse('ch-5', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST),
190 CmHandleRegistrationResponse.createSuccessResponse('ch-6'),
191 CmHandleRegistrationResponse.createFailureResponse('ch-7', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST),
193 cleanup: 'deregister CM handles'
194 deregisterCmHandles(DMI1_URL, (1..7).collect{ 'ch-'+it })
197 def 'CM Handle retry after failed module sync.'() {
198 given: 'DMI is not initially available to handle requests'
199 dmiDispatcher1.isAvailable = false
200 when: 'CM-handles are registered for creation'
201 def cmHandlesToCreate = [new NcmpServiceCmHandle(cmHandleId: 'ch-1'), new NcmpServiceCmHandle(cmHandleId: 'ch-2')]
202 def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
203 objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
204 and: 'the module sync watchdog is triggered'
205 moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
206 then: 'CM-handles go to LOCKED state'
207 assert objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState == CmHandleState.LOCKED
208 when: 'DMI is available for retry'
209 dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M2']]
210 dmiDispatcher1.isAvailable = true
211 and: 'the module sync watchdog is triggered TWICE'
212 2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
213 then: 'Both CM-handles go to READY state'
214 ['ch-1', 'ch-2'].each { cmHandleId ->
215 assert objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState == CmHandleState.READY
217 and: 'Both CM-handles have expected modules'
218 ['ch-1', 'ch-2'].each { cmHandleId ->
219 assert objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() == ['M1', 'M2']
221 cleanup: 'deregister CM handles'
222 deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2'])
225 def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
226 kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId)
227 kafkaConsumer.subscribe([topicName])
228 kafkaConsumer.poll(Duration.ofMillis(500))
231 def getHeadersAsMap(consumerRecord) {
232 def headersAsMap = [:]
233 consumerRecord.headers().each { header ->
234 def value = (new String((byte[]) header.value())).substring(7) // The raw header is prefixed with encoded type
235 headersAsMap.put(header.key(), value)
240 def validateEventHeaders(message, headerAsMap, expectedEventType) {
242 assert UUID.fromString(eventId) != null
243 assert headerAsMap.get('eventId') == eventId
244 assert eventCorrelationId == uniqueId
245 assert headerAsMap.get('eventCorrelationId') == eventCorrelationId
246 assert timestampIsVeryRecent(eventTime)
247 assert headerAsMap.get('eventTime') == eventTime
248 assert eventSource == 'org.onap.ncmp'
249 assert headerAsMap.get('eventSource') == eventSource
250 assert eventType == 'org.onap.ncmp.cmhandle-lcm-event.'+expectedEventType
251 assert headerAsMap.get('eventType') == eventType
252 assert eventSchema == 'org.onap.ncmp:cmhandle-lcm-event'
253 assert headerAsMap.get('eventSchema') == eventSchema
254 assert eventSchemaVersion == '1.0'
255 assert headerAsMap.get('eventSchemaVersion') == eventSchemaVersion
260 def countLcmEventTimerInvocations() {
261 def totalCountForAllTagCombinations = 0
262 for (def timer : meterRegistry.get('cps.ncmp.lcm.events.send').timers()) {
263 totalCountForAllTagCombinations = totalCountForAllTagCombinations + timer.count()
265 return totalCountForAllTagCombinations