2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.dmi.notifications.cmsubscription
23 import ch.qos.logback.classic.Level
24 import ch.qos.logback.classic.Logger
25 import ch.qos.logback.classic.spi.ILoggingEvent
26 import ch.qos.logback.core.read.ListAppender
27 import com.fasterxml.jackson.databind.ObjectMapper
28 import io.cloudevents.CloudEvent
29 import io.cloudevents.core.builder.CloudEventBuilder
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.onap.cps.ncmp.dmi.TestUtils
32 import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
33 import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus
34 import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper
35 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
36 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
37 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent
38 import org.slf4j.LoggerFactory
39 import org.spockframework.spring.SpringBean
40 import org.springframework.boot.test.context.SpringBootTest
41 import org.springframework.test.annotation.DirtiesContext
42 import org.testcontainers.spock.Testcontainers
44 import java.sql.Timestamp
45 import java.time.Duration
46 import java.time.OffsetDateTime
47 import java.time.ZoneId
50 @SpringBootTest(classes = [CmNotificationSubscriptionDmiInEventConsumer])
53 class CmNotificationSubscriptionDmiInEventConsumerSpec extends MessagingBaseSpec {
54 def objectMapper = new ObjectMapper()
55 def testTopic = 'dmi-ncmp-cm-avc-subscription'
58 CmNotificationSubscriptionDmiInEventConsumer objectUnderTest = new CmNotificationSubscriptionDmiInEventConsumer(cloudEventKafkaTemplate)
60 def logger = Spy(ListAppender<ILoggingEvent>)
63 ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).addAppender(logger)
68 ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders()
71 def 'Sends subscription cloud event response successfully.'() {
72 given: 'an subscription event response'
73 objectUnderTest.dmiName = 'test-ncmp-dmi'
74 objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
75 def correlationId = 'test-subscriptionId#test-ncmp-dmi'
76 def cmSubscriptionDmiOutEventData = new Data(statusCode: '1', statusMessage: 'ACCEPTED')
77 def subscriptionEventResponse =
78 new CmNotificationSubscriptionDmiOutEvent().withData(cmSubscriptionDmiOutEventData)
79 and: 'consumer has a subscription'
80 kafkaConsumer.subscribe([testTopic] as List<String>)
81 when: 'an event is published'
82 def eventKey = UUID.randomUUID().toString()
83 objectUnderTest.createAndSendCmNotificationSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus", correlationId, CmNotificationSubscriptionStatus.ACCEPTED)
84 and: 'topic is polled'
85 def records = kafkaConsumer.poll(Duration.ofMillis(1500))
86 then: 'poll returns one record'
87 assert records.size() == 1
88 def record = records.iterator().next()
89 and: 'the record value matches the expected event value'
90 def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
91 assert expectedValue == record.value
92 assert eventKey == record.key
95 def 'Consume valid message.'() {
97 objectUnderTest.dmiName = 'test-ncmp-dmi'
98 def eventKey = UUID.randomUUID().toString()
99 def timestamp = new Timestamp(1679521929511)
100 def jsonData = TestUtils.getResourceFileContent('cmNotificationSubscriptionCreationEvent.json')
101 def subscriptionEvent = objectMapper.readValue(jsonData, CmNotificationSubscriptionDmiInEvent.class)
102 objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
103 def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
104 .withType(subscriptionType)
105 .withDataSchema(URI.create("urn:cps:" + CmNotificationSubscriptionDmiInEvent.class.getName() + ":1.0.0"))
106 .withExtension("correlationid", eventKey)
107 .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
108 .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
109 def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
110 when: 'the valid event is consumed'
111 objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
112 then: 'no exception is thrown'
114 where: 'given #scenario'
115 scenario | subscriptionType
116 'Subscription Create Event' | "subscriptionCreated"
117 'Subscription Delete Event' | "subscriptionDeleted"
120 def 'Consume invalid message.'() {
121 given: 'an invalid event body'
122 objectUnderTest.dmiName = 'test-ncmp-dmi'
123 def eventKey = UUID.randomUUID().toString()
124 def timestamp = new Timestamp(1679521929511)
125 def invalidJsonBody = "/////"
126 objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
127 def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
128 .withType("subscriptionCreated")
129 .withDataSchema(URI.create("urn:cps:org.onap.ncmp.dmi.cm.subscription:1.0.0"))
130 .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
131 .withExtension("correlationid", eventKey).withData(objectMapper.writeValueAsBytes(invalidJsonBody)).build()
132 def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
133 when: 'the invalid event is consumed'
134 objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
135 then: 'exception is thrown and event is logged'
136 def loggingEvent = getLoggingEvent()
137 assert loggingEvent.level == Level.ERROR
138 assert loggingEvent.formattedMessage.contains('Unable to map cloud event to target event class type')
141 def getLoggingEvent() {
142 return logger.list[0]