2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.dmi.notifications.cmsubscription
23 import ch.qos.logback.classic.Level
24 import ch.qos.logback.classic.Logger
25 import ch.qos.logback.classic.spi.ILoggingEvent
26 import ch.qos.logback.core.read.ListAppender
27 import com.fasterxml.jackson.databind.ObjectMapper
28 import io.cloudevents.CloudEvent
29 import io.cloudevents.core.builder.CloudEventBuilder
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.onap.cps.ncmp.dmi.TestUtils
32 import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
33 import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus
34 import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper
35 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.Data
36 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent
37 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
38 import org.slf4j.LoggerFactory
39 import org.spockframework.spring.SpringBean
40 import org.springframework.test.annotation.DirtiesContext
41 import org.testcontainers.spock.Testcontainers
43 import java.sql.Timestamp
44 import java.time.Duration
45 import java.time.OffsetDateTime
46 import java.time.ZoneId
51 class DmiInEventConsumerSpec extends MessagingBaseSpec {
52 def objectMapper = new ObjectMapper()
53 def testTopic = 'dmi-ncmp-cm-avc-subscription'
54 def testDmiName = 'test-ncmp-dmi'
57 DmiInEventConsumer objectUnderTest = new DmiInEventConsumer(cloudEventKafkaTemplate)
59 def logger = Spy(ListAppender<ILoggingEvent>)
62 ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).addAppender(logger)
67 ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders()
70 def 'Sends subscription cloud event response successfully.'() {
71 given: 'an subscription event response'
72 objectUnderTest.dmiName = testDmiName
73 objectUnderTest.dmoOutEventTopic = testTopic
74 def correlationId = 'test-subscriptionId#test-ncmp-dmi'
75 def cmSubscriptionDmiOutEventData = new Data(statusCode: subscriptionStatusCode, statusMessage: subscriptionStatusMessage)
76 def subscriptionEventResponse =
77 new DmiOutEvent().withData(cmSubscriptionDmiOutEventData)
78 and: 'consumer has a subscription'
79 kafkaConsumer.subscribe([testTopic] as List<String>)
80 when: 'an event is published'
81 def eventKey = UUID.randomUUID().toString()
82 objectUnderTest.createAndSendCmNotificationSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus", correlationId, subscriptionAcceptanceType)
83 and: 'topic is polled'
84 def records = kafkaConsumer.poll(Duration.ofMillis(1500))
85 then: 'poll returns one record and close kafkaConsumer'
86 assert records.size() == 1
87 def record = records.iterator().next()
89 and: 'the record value matches the expected event value'
90 def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
91 assert expectedValue == record.value
92 assert eventKey == record.key
93 where: 'given #scenario'
94 scenario | subscriptionAcceptanceType | subscriptionStatusCode | subscriptionStatusMessage
95 'Subscription is Accepted' | CmNotificationSubscriptionStatus.ACCEPTED | '1' | 'ACCEPTED'
96 'Subscription is Rejected' | CmNotificationSubscriptionStatus.REJECTED | '104' | 'REJECTED'
99 def 'Consume valid message.'() {
101 objectUnderTest.dmiName = testDmiName
102 def eventKey = UUID.randomUUID().toString()
103 def timestamp = new Timestamp(1679521929511)
104 def jsonData = TestUtils.getResourceFileContent('cmNotificationSubscriptionCreationEvent.json')
105 def subscriptionEvent = objectMapper.readValue(jsonData, DmiInEvent.class)
106 objectUnderTest.dmoOutEventTopic = testTopic
107 def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
108 .withType(subscriptionType)
109 .withDataSchema(URI.create("urn:cps:" + DmiInEvent.class.getName() + ":1.0.0"))
110 .withExtension("correlationid", eventKey)
111 .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
112 .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
113 def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
114 when: 'the valid event is consumed'
115 objectUnderTest.consumeDmiInEvent(testEventSent)
116 then: 'no exception is thrown'
118 where: 'given #scenario'
119 scenario | subscriptionType
120 'Subscription Create Event' | "subscriptionCreated"
121 'Subscription Delete Event' | "subscriptionDeleted"
124 def 'Consume invalid message.'() {
125 given: 'an invalid event body'
126 objectUnderTest.dmiName = testDmiName
127 def eventKey = UUID.randomUUID().toString()
128 def timestamp = new Timestamp(1679521929511)
129 def invalidJsonBody = "/////"
130 objectUnderTest.dmoOutEventTopic = testTopic
131 def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
132 .withType("subscriptionCreated")
133 .withDataSchema(URI.create("urn:cps:org.onap.ncmp.dmi.cm.subscription:1.0.0"))
134 .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
135 .withExtension("correlationid", eventKey).withData(objectMapper.writeValueAsBytes(invalidJsonBody)).build()
136 def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
137 when: 'the invalid event is consumed'
138 objectUnderTest.consumeDmiInEvent(testEventSent)
139 then: 'exception is thrown and event is logged'
140 def loggingEvent = getLoggingEvent()
141 assert loggingEvent.level == Level.ERROR
142 assert loggingEvent.formattedMessage.contains('Unable to map cloud event to target event class type')
145 def getLoggingEvent() {
146 return logger.list[0]