2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.dmi.notifications.cmsubscription
23 import ch.qos.logback.classic.Level
24 import ch.qos.logback.classic.Logger
25 import ch.qos.logback.classic.spi.ILoggingEvent
26 import ch.qos.logback.core.read.ListAppender
27 import com.fasterxml.jackson.databind.ObjectMapper
28 import io.cloudevents.CloudEvent
29 import io.cloudevents.core.builder.CloudEventBuilder
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.onap.cps.ncmp.dmi.TestUtils
32 import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
33 import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus
34 import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper
35 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
36 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
37 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent
38 import org.slf4j.LoggerFactory
39 import org.spockframework.spring.SpringBean
40 import org.springframework.boot.test.context.SpringBootTest
41 import org.springframework.test.annotation.DirtiesContext
42 import org.testcontainers.spock.Testcontainers
44 import java.sql.Timestamp
45 import java.time.Duration
46 import java.time.OffsetDateTime
47 import java.time.ZoneId
52 class CmNotificationSubscriptionDmiInEventConsumerSpec extends MessagingBaseSpec {
53 def objectMapper = new ObjectMapper()
54 def testTopic = 'dmi-ncmp-cm-avc-subscription'
55 def testDmiName = 'test-ncmp-dmi'
58 CmNotificationSubscriptionDmiInEventConsumer objectUnderTest = new CmNotificationSubscriptionDmiInEventConsumer(cloudEventKafkaTemplate)
60 def logger = Spy(ListAppender<ILoggingEvent>)
63 ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).addAppender(logger)
68 ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders()
71 def 'Sends subscription cloud event response successfully.'() {
72 given: 'an subscription event response'
73 objectUnderTest.dmiName = testDmiName
74 objectUnderTest.cmNotificationSubscriptionDmiOutTopic = testTopic
75 def correlationId = 'test-subscriptionId#test-ncmp-dmi'
76 def cmSubscriptionDmiOutEventData = new Data(statusCode: subscriptionStatusCode, statusMessage: subscriptionStatusMessage)
77 def subscriptionEventResponse =
78 new CmNotificationSubscriptionDmiOutEvent().withData(cmSubscriptionDmiOutEventData)
79 and: 'consumer has a subscription'
80 kafkaConsumer.subscribe([testTopic] as List<String>)
81 when: 'an event is published'
82 def eventKey = UUID.randomUUID().toString()
83 objectUnderTest.createAndSendCmNotificationSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus", correlationId, subscriptionAcceptanceType)
84 and: 'topic is polled'
85 def records = kafkaConsumer.poll(Duration.ofMillis(1500))
86 then: 'poll returns one record and close kafkaConsumer'
87 assert records.size() == 1
88 def record = records.iterator().next()
90 and: 'the record value matches the expected event value'
91 def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
92 assert expectedValue == record.value
93 assert eventKey == record.key
94 where: 'given #scenario'
95 scenario | subscriptionAcceptanceType | subscriptionStatusCode | subscriptionStatusMessage
96 'Subscription is Accepted' | CmNotificationSubscriptionStatus.ACCEPTED | '1' | 'ACCEPTED'
97 'Subscription is Rejected' | CmNotificationSubscriptionStatus.REJECTED | '104' | 'REJECTED'
100 def 'Consume valid message.'() {
102 objectUnderTest.dmiName = testDmiName
103 def eventKey = UUID.randomUUID().toString()
104 def timestamp = new Timestamp(1679521929511)
105 def jsonData = TestUtils.getResourceFileContent('cmNotificationSubscriptionCreationEvent.json')
106 def subscriptionEvent = objectMapper.readValue(jsonData, CmNotificationSubscriptionDmiInEvent.class)
107 objectUnderTest.cmNotificationSubscriptionDmiOutTopic = testTopic
108 def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
109 .withType(subscriptionType)
110 .withDataSchema(URI.create("urn:cps:" + CmNotificationSubscriptionDmiInEvent.class.getName() + ":1.0.0"))
111 .withExtension("correlationid", eventKey)
112 .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
113 .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
114 def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
115 when: 'the valid event is consumed'
116 objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
117 then: 'no exception is thrown'
119 where: 'given #scenario'
120 scenario | subscriptionType
121 'Subscription Create Event' | "subscriptionCreated"
122 'Subscription Delete Event' | "subscriptionDeleted"
125 def 'Consume invalid message.'() {
126 given: 'an invalid event body'
127 objectUnderTest.dmiName = testDmiName
128 def eventKey = UUID.randomUUID().toString()
129 def timestamp = new Timestamp(1679521929511)
130 def invalidJsonBody = "/////"
131 objectUnderTest.cmNotificationSubscriptionDmiOutTopic = testTopic
132 def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
133 .withType("subscriptionCreated")
134 .withDataSchema(URI.create("urn:cps:org.onap.ncmp.dmi.cm.subscription:1.0.0"))
135 .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
136 .withExtension("correlationid", eventKey).withData(objectMapper.writeValueAsBytes(invalidJsonBody)).build()
137 def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
138 when: 'the invalid event is consumed'
139 objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
140 then: 'exception is thrown and event is logged'
141 def loggingEvent = getLoggingEvent()
142 assert loggingEvent.level == Level.ERROR
143 assert loggingEvent.formattedMessage.contains('Unable to map cloud event to target event class type')
146 def getLoggingEvent() {
147 return logger.list[0]