2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.impl.inventory.sync.lcm
23 import io.micrometer.core.instrument.Tag
24 import io.micrometer.core.instrument.simple.SimpleMeterRegistry
25 import org.onap.cps.events.EventProducer
26 import org.onap.cps.ncmp.api.inventory.models.CompositeState
27 import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
28 import org.springframework.kafka.KafkaException
29 import spock.lang.Specification
31 import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.ADVISED
32 import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY
34 class LcmEventProducerSpec extends Specification {
36 def mockEventProducer = Mock(EventProducer)
37 def lcmEventObjectCreator = new LcmEventObjectCreator()
38 def meterRegistry = new SimpleMeterRegistry()
40 def objectUnderTest = new LcmEventProducer(mockEventProducer, lcmEventObjectCreator, meterRegistry)
42 def cmHandleTransitionPair = new CmHandleTransitionPair(
43 new YangModelCmHandle(id: 'ch-1', compositeState: new CompositeState(cmHandleState: ADVISED), additionalProperties: [], publicProperties: []),
44 new YangModelCmHandle(id: 'ch-1', compositeState: new CompositeState(cmHandleState: READY), additionalProperties: [], publicProperties: [])
47 def 'Create and send lcm event where notifications are #scenario.'() {
48 given: 'notificationsEnabled is #notificationsEnabled'
49 objectUnderTest.notificationsEnabled = notificationsEnabled
50 when: 'service is called to send a batch of lcm events'
51 objectUnderTest.sendLcmEventBatchAsynchronously([cmHandleTransitionPair])
52 then: 'producer is called #expectedTimesMethodCalled times with correct identifiers'
53 expectedTimesMethodCalled * mockEventProducer.sendLegacyEvent(_, 'ch-1', _, _) >> {
55 def eventHeaders = args[2]
56 assert UUID.fromString(eventHeaders.get('eventId')) != null
57 assert eventHeaders.get('eventCorrelationId') == 'ch-1'
60 and: 'metrics are recorded with correct tags'
61 def timer = meterRegistry.find('cps.ncmp.lcm.events.send').timer()
62 if (notificationsEnabled) {
63 assert timer.count() == 1
64 assert timer.id.tags.containsAll(Tag.of('oldCmHandleState', 'ADVISED'), Tag.of('newCmHandleState', 'READY'))
68 where: 'the following values are used'
69 scenario | notificationsEnabled || expectedTimesMethodCalled
71 'disabled' | false || 0
74 def 'Exception while sending message.'(){
75 given: 'notifications are enabled'
76 objectUnderTest.notificationsEnabled = true
77 when: 'producer set to throw an exception'
78 mockEventProducer.sendLegacyEvent(*_) >> { throw new KafkaException('sending failed')}
79 and: 'attempt to send events'
80 objectUnderTest.sendLcmEventBatchAsynchronously([cmHandleTransitionPair])
81 then: 'the exception is just logged and not bubbled up'
83 and: 'metrics are not recorded'
84 assert meterRegistry.find('cps.ncmp.lcm.events.send').timer() == null