15f3a487b4eecd06e38ad57a98a7f90f852ee0a0
[cps.git] /
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (C) 2024-2026 OpenInfra Foundation Europe. All rights reserved.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the 'License');
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.integration.functional.ncmp.inventory
22
23 import org.apache.kafka.clients.consumer.KafkaConsumer
24 import org.onap.cps.events.LegacyEvent
25 import org.onap.cps.integration.KafkaTestContainer
26 import org.onap.cps.integration.base.CpsIntegrationSpecBase
27 import org.onap.cps.ncmp.api.NcmpResponseStatus
28 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
29 import org.onap.cps.ncmp.api.inventory.models.CmHandleState
30 import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
31 import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory
32 import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
33 import org.onap.cps.ncmp.events.lcm.LcmEventV1
34 import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
35 import spock.util.concurrent.PollingConditions
36
37 import java.time.Duration
38
39 class CmHandleCreateSpec extends CpsIntegrationSpecBase {
40
41     NetworkCmProxyInventoryFacadeImpl objectUnderTest
42     def uniqueId = 'my-new-cm-handle'
43
44     KafkaConsumer<String, LegacyEvent> kafkaConsumer
45
46     def setup() {
47         objectUnderTest = networkCmProxyInventoryFacade
48         subscribeAndClearPreviousMessages('test-group', 'ncmp-events')
49         clearPreviousInstrumentation()
50     }
51
52     def cleanup() {
53         kafkaConsumer.unsubscribe()
54         kafkaConsumer.close()
55     }
56
57     def 'CM Handle registration.'() {
58         given: 'DMI will return modules when requested'
59             dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
60             dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2']
61         when: 'a CM-handle is registered for creation'
62             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId,
63                                                            alternateId: 'fdn1',
64                                                            moduleSetTag: 'tag1',
65                                                            dataProducerIdentifier: 'prod1',
66                                                            publicProperties: [color:'green'])
67             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
68             def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
69         then: 'registration gives successful response'
70             assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(uniqueId)]
71         and: 'CM-handle is initially in ADVISED state'
72             assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
73         then: 'the module sync watchdog is triggered'
74             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
75         then: 'CM-handle goes to READY state after module sync'
76             assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
77         and: 'the CM-handle has expected modules'
78             assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
79         then: 'get the last 2 messages and related headers'
80             def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2)
81             def messages = []
82             def headerMaps = []
83             consumerRecords.each { consumerRecord ->
84                 messages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEventV1))
85                 headerMaps.add(getHeadersAsMap(consumerRecord))
86             }
87         and: 'both messages have the correct common attributes (that did not change)'
88             assert messages.event.every {
89                 it.cmHandleId == uniqueId &&
90                 it.alternateId == 'fdn1' &&
91                 it.moduleSetTag == 'tag1' &&
92                 it.dataProducerIdentifier == 'prod1'
93             }
94         and: 'the header fields are populated correctly and stored as kafka headers too'
95             validateEventHeaders(messages[0], headerMaps[0], 'create')
96             validateEventHeaders(messages[1], headerMaps[1], 'update')
97         and: 'the first lcm event has no old values and the initial attributes as new values state ADVISED'
98             with(messages[0].event) {
99                 assert oldValues == null
100                 assert newValues.cmHandleState.value() == 'ADVISED'
101                 assert newValues.dataSyncEnabled == null
102                 assert newValues.cmHandleProperties[0] == [color:'green']
103             }
104         and: 'the next event is about update to READY state (new value), the old value for state is ADVISED'
105             assert messages[1].event.oldValues.cmHandleState.value() == 'ADVISED'
106             assert messages[1].event.newValues.cmHandleState.value() == 'READY'
107         and: 'the cm handle (public) properties have not changed and are therefore null for old and new values'
108             assert messages[1].event.oldValues.cmHandleProperties == null
109             assert messages[1].event.newValues.cmHandleProperties == null
110         and: 'the data sync flag goes from undefined to false'
111             assert messages[1].event.oldValues.dataSyncEnabled == null
112             assert messages[1].event.newValues.dataSyncEnabled == false
113         and: 'there are no more messages to be read'
114             assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1).size() == 0
115         and: 'instrumentation has recorded 2 events (null > ADVISED, ADVISED > READY)'
116             new PollingConditions().within(5) {
117                 assert countLcmEventTimerInvocations() == 2
118             }
119         then: 'deregister CM handle'
120             deregisterCmHandle(DMI1_URL, uniqueId)
121         and: 'instrumentation has recorded 2 more events (READY > DELETING, DELETING > null)'
122             new PollingConditions().within(5) {
123                 assert countLcmEventTimerInvocations() == 2 + 2
124             }
125     }
126
127     def 'CM Handle registration with DMI error during module sync.'() {
128         given: 'DMI is not available to handle requests'
129             dmiDispatcher1.isAvailable = false
130         when: 'a CM-handle is registered for creation'
131             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
132             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
133             objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
134         and: 'the module sync watchdog is triggered'
135             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
136         then: 'CM-handle goes to LOCKED state with reason MODULE_SYNC_FAILED'
137             def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState('ch-1')
138             assert cmHandleCompositeState.cmHandleState == CmHandleState.LOCKED
139             assert cmHandleCompositeState.lockReason.lockReasonCategory == LockReasonCategory.MODULE_SYNC_FAILED
140         and: 'CM-handle has no modules'
141             assert objectUnderTest.getYangResourcesModuleReferences('ch-1').empty
142         cleanup: 'deregister CM handle'
143             deregisterCmHandle(DMI1_URL, 'ch-1')
144     }
145
146     def 'Create a CM-handle with existing moduleSetTag.'() {
147         given: 'DMI will return modules when requested'
148             dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M3']]
149         and: 'existing CM-handles cm-1 with moduleSetTag "A", and cm-2 with moduleSetTag "B"'
150             registerCmHandle(DMI1_URL, 'ch-1', 'A')
151             registerCmHandle(DMI1_URL, 'ch-2', 'B')
152         when: 'a CM-handle is registered for creation with moduleSetTag "B"'
153             def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-3', moduleSetTag: 'B')
154             objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]))
155         and: 'the module sync watchdog is triggered'
156             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
157         then: 'the CM-handle goes to READY state'
158             assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState('ch-3').cmHandleState
159         and: 'the CM-handle has expected moduleSetTag'
160             assert objectUnderTest.getNcmpServiceCmHandle('ch-3').moduleSetTag == 'B'
161         and: 'the CM-handle has expected modules from module set "B": M1 and M3'
162             assert ['M1', 'M3'] == objectUnderTest.getYangResourcesModuleReferences('ch-3').moduleName.sort()
163         cleanup: 'deregister CM handles'
164             deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2', 'ch-3'])
165     }
166
167     def 'Create CM-handles with alternate IDs.'() {
168         given: 'DMI will return modules for all CM-handles when requested'
169             dmiDispatcher1.moduleNamesPerCmHandleId = (1..7).collectEntries{ ['ch-'+it, ['M1']] }
170         and: 'an existing CM-handle with an alternate ID'
171             registerCmHandle(DMI1_URL, 'ch-1', NO_MODULE_SET_TAG, 'existing-alt-id')
172         and: 'an existing CM-handle with no alternate ID'
173             registerCmHandle(DMI1_URL, 'ch-2', NO_MODULE_SET_TAG, NO_ALTERNATE_ID)
174         when: 'a batch of CM-handles is registered for creation with various alternate IDs'
175             def cmHandlesToCreate = [
176                 new NcmpServiceCmHandle(cmHandleId: 'ch-3', alternateId: NO_ALTERNATE_ID),
177                 new NcmpServiceCmHandle(cmHandleId: 'ch-4', alternateId: 'unique-alt-id'),
178                 new NcmpServiceCmHandle(cmHandleId: 'ch-5', alternateId: 'existing-alt-id'),
179                 new NcmpServiceCmHandle(cmHandleId: 'ch-6', alternateId: 'duplicate-alt-id'),
180                 new NcmpServiceCmHandle(cmHandleId: 'ch-7', alternateId: 'duplicate-alt-id'),
181             ]
182             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
183             def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
184         then: 'registration gives expected responses'
185             assert dmiPluginRegistrationResponse.createdCmHandles.sort { it.cmHandle } == [
186                 CmHandleRegistrationResponse.createSuccessResponse('ch-3'),
187                 CmHandleRegistrationResponse.createSuccessResponse('ch-4'),
188                 CmHandleRegistrationResponse.createFailureResponse('ch-5', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST),
189                 CmHandleRegistrationResponse.createSuccessResponse('ch-6'),
190                 CmHandleRegistrationResponse.createFailureResponse('ch-7', NcmpResponseStatus.CM_HANDLE_ALREADY_EXIST),
191             ]
192         cleanup: 'deregister CM handles'
193             deregisterCmHandles(DMI1_URL, (1..7).collect{ 'ch-'+it })
194     }
195
196     def 'CM Handle retry after failed module sync.'() {
197         given: 'DMI is not initially available to handle requests'
198             dmiDispatcher1.isAvailable = false
199         when: 'CM-handles are registered for creation'
200             def cmHandlesToCreate = [new NcmpServiceCmHandle(cmHandleId: 'ch-1'), new NcmpServiceCmHandle(cmHandleId: 'ch-2')]
201             def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
202             objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
203         and: 'the module sync watchdog is triggered'
204             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
205         then: 'CM-handles go to LOCKED state'
206             assert objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState == CmHandleState.LOCKED
207         when: 'DMI is available for retry'
208             dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M2']]
209             dmiDispatcher1.isAvailable = true
210         and: 'the module sync watchdog is triggered TWICE'
211             2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
212         then: 'Both CM-handles go to READY state'
213             ['ch-1', 'ch-2'].each { cmHandleId ->
214                 assert objectUnderTest.getCmHandleCompositeState(cmHandleId).cmHandleState == CmHandleState.READY
215             }
216         and: 'Both CM-handles have expected modules'
217             ['ch-1', 'ch-2'].each { cmHandleId ->
218                 assert objectUnderTest.getYangResourcesModuleReferences(cmHandleId).moduleName.sort() == ['M1', 'M2']
219             }
220         cleanup: 'deregister CM handles'
221             deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2'])
222     }
223
224     def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
225         kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId)
226         kafkaConsumer.subscribe([topicName])
227         kafkaConsumer.poll(Duration.ofMillis(500))
228     }
229
230     def getHeadersAsMap(consumerRecord) {
231         def headersAsMap = [:]
232         consumerRecord.headers().each { header ->
233             def value = (new String((byte[]) header.value())).substring(7) // The raw header is prefixed with encoded type
234             headersAsMap.put(header.key(), value)
235         }
236         return headersAsMap
237     }
238
239     def validateEventHeaders(message, headerAsMap, expectedEventType) {
240         with(message) {
241             assert UUID.fromString(eventId) != null
242             assert headerAsMap.get('eventId') == eventId
243             assert eventCorrelationId == uniqueId
244             assert headerAsMap.get('eventCorrelationId') == eventCorrelationId
245             assert timestampIsVeryRecent(eventTime)
246             assert headerAsMap.get('eventTime') == eventTime
247             assert eventSource == 'org.onap.ncmp'
248             assert headerAsMap.get('eventSource') == eventSource
249             assert eventType == 'org.onap.ncmp.cmhandle-lcm-event.'+expectedEventType
250             assert headerAsMap.get('eventType') == eventType
251             assert eventSchema == 'org.onap.ncmp:cmhandle-lcm-event'
252             assert headerAsMap.get('eventSchema') == eventSchema
253             assert eventSchemaVersion == '1.0'
254             assert headerAsMap.get('eventSchemaVersion') == eventSchemaVersion
255         }
256         return true
257     }
258
259     def countLcmEventTimerInvocations() {
260         def totalCountForAllTagCombinations = 0
261         for (def timer : meterRegistry.get('cps.ncmp.lcm.events.send').timers()) {
262             totalCountForAllTagCombinations = totalCountForAllTagCombinations + timer.count()
263         }
264         return totalCountForAllTagCombinations
265     }
266
267 }