52fa37943293be82a2556ba40c5131eab89208c0
[cps/ncmp-dmi-plugin.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2024 Nordix Foundation
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.dmi.notifications.cmsubscription
22
23 import ch.qos.logback.classic.Level
24 import ch.qos.logback.classic.Logger
25 import ch.qos.logback.classic.spi.ILoggingEvent
26 import ch.qos.logback.core.read.ListAppender
27 import com.fasterxml.jackson.databind.ObjectMapper
28 import io.cloudevents.CloudEvent
29 import io.cloudevents.core.builder.CloudEventBuilder
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.onap.cps.ncmp.dmi.TestUtils
32 import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
33 import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus
34 import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper
35 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.Data
36 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent
37 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
38 import org.slf4j.LoggerFactory
39 import org.spockframework.spring.SpringBean
40 import org.springframework.test.annotation.DirtiesContext
41 import org.testcontainers.spock.Testcontainers
42
43 import java.sql.Timestamp
44 import java.time.Duration
45 import java.time.OffsetDateTime
46 import java.time.ZoneId
47
48
49 @Testcontainers
50 @DirtiesContext
51 class DmiInEventConsumerSpec extends MessagingBaseSpec {
52     def objectMapper = new ObjectMapper()
53     def testTopic = 'dmi-ncmp-cm-avc-subscription'
54     def testDmiName = 'test-ncmp-dmi'
55
56     @SpringBean
57     DmiInEventConsumer objectUnderTest = new DmiInEventConsumer(cloudEventKafkaTemplate)
58
59     def logger = Spy(ListAppender<ILoggingEvent>)
60
61     void setup() {
62         ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).addAppender(logger)
63         logger.start()
64     }
65
66     void cleanup() {
67         ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders()
68     }
69
70     def 'Sends subscription cloud event response successfully.'() {
71         given: 'an subscription event response'
72             objectUnderTest.dmiName = testDmiName
73             objectUnderTest.dmoOutEventTopic = testTopic
74             def correlationId = 'test-subscriptionId#test-ncmp-dmi'
75             def cmSubscriptionDmiOutEventData = new Data(statusCode: subscriptionStatusCode, statusMessage: subscriptionStatusMessage)
76             def subscriptionEventResponse =
77                     new DmiOutEvent().withData(cmSubscriptionDmiOutEventData)
78         and: 'consumer has a subscription'
79             kafkaConsumer.subscribe([testTopic] as List<String>)
80         when: 'an event is published'
81             def eventKey = UUID.randomUUID().toString()
82             objectUnderTest.createAndSendCmNotificationSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus", correlationId, subscriptionAcceptanceType)
83         and: 'topic is polled'
84             def records = kafkaConsumer.poll(Duration.ofMillis(1500))
85         then: 'poll returns one record and close kafkaConsumer'
86             assert records.size() == 1
87             def record = records.iterator().next()
88             kafkaConsumer.close()
89         and: 'the record value matches the expected event value'
90             def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
91             assert expectedValue == record.value
92             assert eventKey == record.key
93         where: 'given #scenario'
94             scenario                   | subscriptionAcceptanceType                | subscriptionStatusCode | subscriptionStatusMessage
95             'Subscription is Accepted' | CmNotificationSubscriptionStatus.ACCEPTED | '1'                    | 'ACCEPTED'
96             'Subscription is Rejected' | CmNotificationSubscriptionStatus.REJECTED | '104'                  | 'REJECTED'
97     }
98
99     def 'Consume valid message.'() {
100         given: 'an event'
101             objectUnderTest.dmiName = testDmiName
102             def eventKey = UUID.randomUUID().toString()
103             def timestamp = new Timestamp(1679521929511)
104             def jsonData = TestUtils.getResourceFileContent('cmNotificationSubscriptionCreationEvent.json')
105             def subscriptionEvent = objectMapper.readValue(jsonData, DmiInEvent.class)
106             objectUnderTest.dmoOutEventTopic = testTopic
107             def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
108                     .withType(subscriptionType)
109                     .withDataSchema(URI.create("urn:cps:" + DmiInEvent.class.getName() + ":1.0.0"))
110                     .withExtension("correlationid", eventKey)
111                     .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
112                     .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
113             def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
114         when: 'the valid event is consumed'
115             objectUnderTest.consumeDmiInEvent(testEventSent)
116         then: 'no exception is thrown'
117             noExceptionThrown()
118         where: 'given #scenario'
119             scenario                    | subscriptionType
120             'Subscription Create Event' | "subscriptionCreated"
121             'Subscription Delete Event' | "subscriptionDeleted"
122     }
123
124     def 'Consume invalid message.'() {
125         given: 'an invalid event body'
126             objectUnderTest.dmiName = testDmiName
127             def eventKey = UUID.randomUUID().toString()
128             def timestamp = new Timestamp(1679521929511)
129             def invalidJsonBody = "/////"
130             objectUnderTest.dmoOutEventTopic = testTopic
131             def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
132                     .withType("subscriptionCreated")
133                     .withDataSchema(URI.create("urn:cps:org.onap.ncmp.dmi.cm.subscription:1.0.0"))
134                     .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
135                     .withExtension("correlationid", eventKey).withData(objectMapper.writeValueAsBytes(invalidJsonBody)).build()
136             def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
137         when: 'the invalid event is consumed'
138             objectUnderTest.consumeDmiInEvent(testEventSent)
139         then: 'exception is thrown and event is logged'
140             def loggingEvent = getLoggingEvent()
141             assert loggingEvent.level == Level.ERROR
142             assert loggingEvent.formattedMessage.contains('Unable to map cloud event to target event class type')
143     }
144
145     def getLoggingEvent() {
146         return logger.list[0]
147     }
148 }