4795343933b709aaf06eee8dbbae4a4f9cfd0430
[cps/ncmp-dmi-plugin.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2024 Nordix Foundation
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.dmi.notifications.cmsubscription
22
23 import ch.qos.logback.classic.Level
24 import ch.qos.logback.classic.Logger
25 import ch.qos.logback.classic.spi.ILoggingEvent
26 import ch.qos.logback.core.read.ListAppender
27 import com.fasterxml.jackson.databind.ObjectMapper
28 import io.cloudevents.CloudEvent
29 import io.cloudevents.core.builder.CloudEventBuilder
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.onap.cps.ncmp.dmi.TestUtils
32 import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
33 import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus
34 import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper
35 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
36 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
37 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent
38 import org.slf4j.LoggerFactory
39 import org.spockframework.spring.SpringBean
40 import org.springframework.boot.test.context.SpringBootTest
41 import org.springframework.test.annotation.DirtiesContext
42 import org.testcontainers.spock.Testcontainers
43
44 import java.sql.Timestamp
45 import java.time.Duration
46 import java.time.OffsetDateTime
47 import java.time.ZoneId
48
49
50 @SpringBootTest(classes = [CmNotificationSubscriptionDmiInEventConsumer])
51 @Testcontainers
52 @DirtiesContext
53 class CmNotificationSubscriptionDmiInEventConsumerSpec extends MessagingBaseSpec {
54     def objectMapper = new ObjectMapper()
55     def testTopic = 'dmi-ncmp-cm-avc-subscription'
56
57     @SpringBean
58     CmNotificationSubscriptionDmiInEventConsumer objectUnderTest = new CmNotificationSubscriptionDmiInEventConsumer(cloudEventKafkaTemplate)
59
60     def logger = Spy(ListAppender<ILoggingEvent>)
61
62     void setup() {
63         ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).addAppender(logger)
64         logger.start()
65     }
66
67     void cleanup() {
68         ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders()
69     }
70
71     def 'Sends subscription cloud event response successfully.'() {
72         given: 'an subscription event response'
73             objectUnderTest.dmiName = 'test-ncmp-dmi'
74             objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
75             def correlationId = 'test-subscriptionId#test-ncmp-dmi'
76             def cmSubscriptionDmiOutEventData = new Data(statusCode: '1', statusMessage: 'ACCEPTED')
77             def subscriptionEventResponse =
78                     new CmNotificationSubscriptionDmiOutEvent().withData(cmSubscriptionDmiOutEventData)
79         and: 'consumer has a subscription'
80             kafkaConsumer.subscribe([testTopic] as List<String>)
81         when: 'an event is published'
82             def eventKey = UUID.randomUUID().toString()
83             objectUnderTest.createAndSendCmNotificationSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus", correlationId, CmNotificationSubscriptionStatus.ACCEPTED)
84         and: 'topic is polled'
85             def records = kafkaConsumer.poll(Duration.ofMillis(1500))
86         then: 'poll returns one record'
87             assert records.size() == 1
88             def record = records.iterator().next()
89         and: 'the record value matches the expected event value'
90             def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
91             assert expectedValue == record.value
92             assert eventKey == record.key
93     }
94
95     def 'Consume valid message.'() {
96         given: 'an event'
97             objectUnderTest.dmiName = 'test-ncmp-dmi'
98             def eventKey = UUID.randomUUID().toString()
99             def timestamp = new Timestamp(1679521929511)
100             def jsonData = TestUtils.getResourceFileContent('cmNotificationSubscriptionCreationEvent.json')
101             def subscriptionEvent = objectMapper.readValue(jsonData, CmNotificationSubscriptionDmiInEvent.class)
102             objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
103             def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
104                     .withType(subscriptionType)
105                     .withDataSchema(URI.create("urn:cps:" + CmNotificationSubscriptionDmiInEvent.class.getName() + ":1.0.0"))
106                     .withExtension("correlationid", eventKey)
107                     .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
108                     .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
109             def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
110         when: 'the valid event is consumed'
111             objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
112         then: 'no exception is thrown'
113             noExceptionThrown()
114         where: 'given #scenario'
115             scenario                    | subscriptionType
116             'Subscription Create Event' | "subscriptionCreated"
117             'Subscription Delete Event' | "subscriptionDeleted"
118     }
119
120     def 'Consume invalid message.'() {
121         given: 'an invalid event body'
122             objectUnderTest.dmiName = 'test-ncmp-dmi'
123             def eventKey = UUID.randomUUID().toString()
124             def timestamp = new Timestamp(1679521929511)
125             def invalidJsonBody = "/////"
126             objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
127             def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
128                     .withType("subscriptionCreated")
129                     .withDataSchema(URI.create("urn:cps:org.onap.ncmp.dmi.cm.subscription:1.0.0"))
130                     .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
131                     .withExtension("correlationid", eventKey).withData(objectMapper.writeValueAsBytes(invalidJsonBody)).build()
132             def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
133         when: 'the invalid event is consumed'
134             objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
135         then: 'exception is thrown and event is logged'
136             def loggingEvent = getLoggingEvent()
137             assert loggingEvent.level == Level.ERROR
138             assert loggingEvent.formattedMessage.contains('Unable to map cloud event to target event class type')
139     }
140
141     def getLoggingEvent() {
142         return logger.list[0]
143     }
144 }