8f84c5ff2dc5bfa0357115b4d64693e17f8f0330
[cps.git] /
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2024-2026 OpenInfra Foundation Europe. All rights reserved.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the 'License');
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.integration.functional.ncmp.inventory
22
23 import org.apache.kafka.clients.consumer.KafkaConsumer
24 import org.onap.cps.events.LegacyEvent
25 import org.onap.cps.integration.KafkaTestContainer
26 import org.onap.cps.integration.base.CpsIntegrationSpecBase
27 import org.onap.cps.ncmp.api.NcmpResponseStatus
28 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
29 import org.onap.cps.ncmp.api.inventory.models.CmHandleState
30 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
31 import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory
32 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
33 import org.onap.cps.ncmp.events.lcm.LcmEventV1
34 import org.onap.cps.ncmp.events.lcm.LcmEventV2
35 import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
36 import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventProducer
37 import org.springframework.beans.factory.annotation.Autowired
38 import org.springframework.test.util.ReflectionTestUtils
39 import spock.util.concurrent.PollingConditions
40
41 import java.time.Duration
42
43 class CmHandleCreateSpec extends CpsIntegrationSpecBase {
44
45     NetworkCmProxyInventoryFacadeImpl objectUnderTest
46     def uniqueId = 'my-new-cm-handle'
47
48     KafkaConsumer<String, LegacyEvent> kafkaConsumer
49
50     @Autowired
51     LcmEventProducer lcmEventProducer
52
53     def setup() {
54         objectUnderTest = networkCmProxyInventoryFacade
55         subscribeAndClearPreviousMessages('test-group', 'ncmp-events')
56         clearPreviousInstrumentation()
57     }
58
59     def cleanup() {
60         kafkaConsumer.unsubscribe()
61         kafkaConsumer.close()
62     }
63
64     def 'CM Handle registration.'() {
65         given: 'DMI will return modules when requested'
66             dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
67             dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2']
68         when: 'a CM-handle is registered for creation'
69             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId,
70                                                            alternateId: 'fdn1',
71                                                            moduleSetTag: 'tag1',
72                                                            dataProducerIdentifier: 'prod1',
73                                                            publicProperties: [color:'green'])
74             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
75             def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
76         then: 'registration gives successful response'
77             assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(uniqueId)]
78         and: 'CM-handle is initially in ADVISED state'
79             assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
80         then: 'the module sync watchdog is triggered'
81             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
82         then: 'CM-handle goes to READY state after module sync'
83             assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
84         and: 'the CM-handle has expected modules'
85             assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
86         then: 'get the last 2 messages and related headers'
87             def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2)
88             def messages = []
89             def headerMaps = []
90             consumerRecords.each { consumerRecord ->
91                 messages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEventV1))
92                 headerMaps.add(getHeadersAsMap(consumerRecord))
93             }
94         and: 'both messages have the correct common attributes (that did not change)'
95             assert messages.event.every {
96                 it.cmHandleId == uniqueId &&
97                 it.alternateId == 'fdn1' &&
98                 it.moduleSetTag == 'tag1' &&
99                 it.dataProducerIdentifier == 'prod1'
100             }
101         and: 'the header fields are populated correctly and stored as kafka headers too'
102             validateEventHeaders(messages[0], headerMaps[0], 'create')
103             validateEventHeaders(messages[1], headerMaps[1], 'update')
104         and: 'the first lcm event has no old values and the initial attributes as new values state ADVISED'
105             with(messages[0].event) {
106                 assert oldValues == null
107                 assert newValues.cmHandleState.value() == 'ADVISED'
108                 assert newValues.dataSyncEnabled == null
109                 assert newValues.cmHandleProperties == [[color:'green']]
110             }
111         and: 'the next event is about update to READY state (new value), the old value for state is ADVISED'
112             assert messages[1].event.oldValues.cmHandleState.value() == 'ADVISED'
113             assert messages[1].event.newValues.cmHandleState.value() == 'READY'
114         and: 'the cm handle (public) properties have not changed and are therefore null for old and new values'
115             assert messages[1].event.oldValues.cmHandleProperties == null
116             assert messages[1].event.newValues.cmHandleProperties == null
117         and: 'the data sync flag goes from undefined to false'
118             assert messages[1].event.oldValues.dataSyncEnabled == null
119             assert messages[1].event.newValues.dataSyncEnabled == false
120         and: 'there are no more messages to be read'
121             assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1).size() == 0
122         and: 'instrumentation has recorded 2 events (null > ADVISED, ADVISED > READY)'
123             new PollingConditions().within(5) {
124                 assert countLcmEventTimerInvocations() == 2
125             }
126         then: 'deregister CM handle'
127             deregisterCmHandle(DMI1_URL, uniqueId)
128         and: 'instrumentation has recorded 2 more events (READY > DELETING, DELETING > null)'
129             new PollingConditions().within(5) {
130                 assert countLcmEventTimerInvocations() == 2 + 2
131             }
132     }
133
134     def 'CM Handle registration using V2 events.'() {
135         given: 'event schema version is set to v2'
136             def originalEventSchemaVersion = ReflectionTestUtils.getField(lcmEventProducer, 'eventSchemaVersion')
137             ReflectionTestUtils.setField(lcmEventProducer, 'eventSchemaVersion', 'v2')
138         and: 'DMI will return modules when requested'
139             def uniqueIdV2 = 'my-new-cm-handle-v2'
140             dmiDispatcher1.moduleNamesPerCmHandleId[uniqueIdV2] = ['M1', 'M2']
141         when: 'a CM-handle is registered for creation'
142             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueIdV2,
143                                                            alternateId: 'fdn1',
144                                                            moduleSetTag: 'tag1',
145                                                            dataProducerIdentifier: 'prod1',
146                                                            publicProperties: [color:'green'])
147             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
148             def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
149         then: 'registration gives successful response'
150             assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(uniqueIdV2)]
151         and: 'CM-handle is initially in ADVISED state'
152             assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueIdV2).cmHandleState
153         then: 'the module sync watchdog is triggered'
154             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
155         then: 'CM-handle goes to READY state after module sync'
156             assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueIdV2).cmHandleState
157         and: 'the CM-handle has expected modules'
158             assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueIdV2).moduleName.sort()
159         then: 'get the last 2 messages'
160             def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2)
161             def messages = []
162             consumerRecords.each { consumerRecord ->
163                 messages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEventV2))
164             }
165         and: 'both messages have the correct cmHandleId'
166             assert messages.event.every { it.cmHandleId == uniqueIdV2 }
167         and: 'the first lcm event has no old values and the initial attributes as new values state ADVISED'
168             with(messages[0].event) {
169                 assert oldValues == null
170                 assert newValues['cmHandleState'] == 'ADVISED'
171                 assert newValues['dataSyncEnabled'] == null
172                 assert newValues['cmHandleProperties'] == [color:'green']
173                 assert newValues['alternateId'] == 'fdn1'
174                 assert newValues['moduleSetTag'] == 'tag1'
175                 assert newValues['dataProducerIdentifier'] == 'prod1'
176             }
177         and: 'the next event is about update to READY state (new value), the old value for state is ADVISED'
178             assert messages[1].event.oldValues['cmHandleState'] == 'ADVISED'
179             assert messages[1].event.newValues['cmHandleState'] == 'READY'
180         and: 'the cm handle (public) properties have not changed and are therefore null for old and new values'
181             assert messages[1].event.oldValues['cmHandleProperties'] == null
182             assert messages[1].event.newValues['cmHandleProperties'] == null
183         and: 'the data sync flag goes from undefined to false'
184             assert messages[1].event.oldValues['dataSyncEnabled'] == null
185             assert messages[1].event.newValues['dataSyncEnabled'] == false
186         and: 'alternateId, moduleSetTag, dataProducerIdentifier have not changed and are therefore null for old and new values'
187             assert messages[1].event.oldValues['alternateId'] == null
188             assert messages[1].event.newValues['alternateId'] == null
189             assert messages[1].event.oldValues['moduleSetTag'] == null
190             assert messages[1].event.newValues['moduleSetTag'] == null
191             assert messages[1].event.oldValues['dataProducerIdentifier'] == null
192             assert messages[1].event.newValues['dataProducerIdentifier'] == null
193         cleanup: 'restore original event schema version and deregister CM handle'
194             ReflectionTestUtils.setField(lcmEventProducer, 'eventSchemaVersion', originalEventSchemaVersion)
195             deregisterCmHandle(DMI1_URL, uniqueIdV2)
196     }
197
198     def 'CM Handle registration with DMI error during module sync.'() {
199         given: 'DMI is not available to handle requests'
200             dmiDispatcher1.isAvailable = false
201         when: 'a CM-handle is registered for creation'
202             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
203             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
204             objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
205         and: 'the module sync watchdog is triggered'
206             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
207         then: 'CM-handle goes to LOCKED state with reason MODULE_SYNC_FAILED'
208             def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState('ch-1')
209             assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED
210             assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_SYNC_FAILED
211         and: 'CM-handle has no modules'
212             assert objectUnderTest.getYangResourcesModuleReferences('ch-1').empty
213         cleanup: 'deregister CM handle'
214             deregisterCmHandle(DMI1_URL, 'ch-1')
215     }
216
217     def 'Create a CM-handle with existing moduleSetTag.'() {
218         given: 'DMI will return modules when requested'
219             dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M3']]
220         and: 'existing CM-handles cm-1 with moduleSetTag "A", and cm-2 with moduleSetTag "B"'
221             registerCmHandle(DMI1_URL, 'ch-1', 'A')
222             registerCmHandle(DMI1_URL, 'ch-2', 'B')
223         when: 'a CM-handle is registered for creation with moduleSetTag "B"'
224             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-3', moduleSetTag: 'B')
225             objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]))
226         and: 'the module sync watchdog is triggered'
227             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
228         then: 'the CM-handle goes to READY state'
229             assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState('ch-3').cmHandleState
230         and: 'the CM-handle has expected moduleSetTag'
231             assert objectUnderTest.getNcmpServiceCmHandle('ch-3').moduleSetTag == 'B'
232         and: 'the CM-handle has expected modules from module set "B": M1 and M3'
233             assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences('ch-3').moduleName.sort()
234         cleanup: 'deregister CM handles'
235             deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2', 'ch-3'])
236     }
237
238     def 'Create CM-handles with alternate IDs.'() {
239         given: 'DMI will return modules for all CM-handles when requested'
240             dmiDispatcher1.moduleNamesPerCmHandleId = (1..7).collectEntries{ ['ch-'+it, ['M1']] }
241         and: 'an existing CM-handle with an alternate ID'
242             registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, 'existing-alt-id')
243         and: 'an existing CM-handle with no alternate ID'
244             registerCmHandle(DMI1_URL, 'ch-2', NO_MODULE_SET_TAG, NO_ALTERNATE_ID)
245         when: 'a batch of CM-handles is registered for creation with various alternate IDs'
246             def cmHandlesToCreate = [
247                 new NcmpServiceCmHandle(cmHandleId: 'ch-3', alternateId: NO_ALTERNATE_ID),
248                 new NcmpServiceCmHandle(cmHandleId: 'ch-4', alternateId: 'unique-alt-id'),
249                 new NcmpServiceCmHandle(cmHandleId: 'ch-5', alternateId: 'existing-alt-id'),
250                 new NcmpServiceCmHandle(cmHandleId: 'ch-6', alternateId: 'duplicate-alt-id'),
251                 new NcmpServiceCmHandle(cmHandleId: 'ch-7', alternateId: 'duplicate-alt-id'),
252             ]
253             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
254             def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
255         then: 'registration gives expected responses'
256             assert dmiPluginRegistrationResponse.createdCmHandles.sort { it.cmHandle } == [
257                 CmHandleRegistrationResponse.createSuccessResponse('ch-3'),
258                 CmHandleRegistrationResponse.createSuccessResponse('ch-4'),
259                 CmHandleRegistrationResponse.createFailureResponse('ch-5', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST),
260                 CmHandleRegistrationResponse.createSuccessResponse('ch-6'),
261                 CmHandleRegistrationResponse.createFailureResponse('ch-7', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST),
262             ]
263         cleanup: 'deregister CM handles'
264             deregisterCmHandles(DMI1_URL, (1..7).collect{ 'ch-'+it })
265     }
266
267     def 'CM Handle retry after failed module sync.'() {
268         given: 'DMI is not initially available to handle requests'
269             dmiDispatcher1.isAvailable = false
270         when: 'CM-handles are registered for creation'
271             def cmHandlesToCreate = [new NcmpServiceCmHandle(cmHandleId: 'ch-1'), new NcmpServiceCmHandle(cmHandleId: 'ch-2')]
272             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
273             objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
274         and: 'the module sync watchdog is triggered'
275             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
276         then: 'CM-handles go to LOCKED state'
277             assert objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState == CmHandleState.LOCKED
278         when: 'DMI is available for retry'
279             dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M2']]
280             dmiDispatcher1.isAvailable = true
281         and: 'the module sync watchdog is triggered TWICE'
282             2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
283         then: 'Both CM-handles go to READY state'
284             ['ch-1', 'ch-2'].each { cmHandleId ->
285                 assert objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState == CmHandleState.READY
286             }
287         and: 'Both CM-handles have expected modules'
288             ['ch-1', 'ch-2'].each { cmHandleId ->
289                 assert objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() == ['M1', 'M2']
290             }
291         cleanup: 'deregister CM handles'
292             deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2'])
293     }
294
295     def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
296         kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId)
297         kafkaConsumer.subscribe([topicName])
298         kafkaConsumer.poll(Duration.ofMillis(500))
299     }
300
301     def getHeadersAsMap(consumerRecord) {
302         def headersAsMap = [:]
303         consumerRecord.headers().each { header ->
304             def value = (new String((byte[]) header.value())).substring(7) // The raw header is prefixed with encoded type
305             headersAsMap.put(header.key(), value)
306         }
307         return headersAsMap
308     }
309
310     def validateEventHeaders(message, headerAsMap, expectedEventType) {
311         with(message) {
312             assert UUID.fromString(eventId) != null
313             assert headerAsMap.get('eventId') == eventId
314             assert eventCorrelationId == uniqueId
315             assert headerAsMap.get('eventCorrelationId') == eventCorrelationId
316             assert timestampIsVeryRecent(eventTime)
317             assert headerAsMap.get('eventTime') == eventTime
318             assert eventSource == 'org.onap.ncmp'
319             assert headerAsMap.get('eventSource') == eventSource
320             assert eventType == 'org.onap.ncmp.cmhandle-lcm-event.'+expectedEventType
321             assert headerAsMap.get('eventType') == eventType
322             assert eventSchema == 'org.onap.ncmp:cmhandle-lcm-event'
323             assert headerAsMap.get('eventSchema') == eventSchema
324             assert eventSchemaVersion == '1.0'
325             assert headerAsMap.get('eventSchemaVersion') == eventSchemaVersion
326         }
327         return true
328     }
329
330     def countLcmEventTimerInvocations() {
331         def totalCountForAllTagCombinations = 0
332         for (def timer : meterRegistry.get('cps.ncmp.lcm.events.send').timers()) {
333             totalCountForAllTagCombinations = totalCountForAllTagCombinations + timer.count()
334         }
335         return totalCountForAllTagCombinations
336     }
337
338 }