aa331c4bd7f6fd87d2739222b11154088ad52630
[cps/ncmp-dmi-plugin.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2024 Nordix Foundation
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.dmi.notifications.cmsubscription
22
23 import ch.qos.logback.classic.Level
24 import ch.qos.logback.classic.Logger
25 import ch.qos.logback.classic.spi.ILoggingEvent
26 import ch.qos.logback.core.read.ListAppender
27 import com.fasterxml.jackson.databind.ObjectMapper
28 import io.cloudevents.CloudEvent
29 import io.cloudevents.core.builder.CloudEventBuilder
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.onap.cps.ncmp.dmi.TestUtils
32 import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
33 import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus
34 import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper
35 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
36 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
37 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent
38 import org.slf4j.LoggerFactory
39 import org.spockframework.spring.SpringBean
40 import org.springframework.boot.test.context.SpringBootTest
41 import org.springframework.test.annotation.DirtiesContext
42 import org.testcontainers.spock.Testcontainers
43
44 import java.sql.Timestamp
45 import java.time.Duration
46 import java.time.OffsetDateTime
47 import java.time.ZoneId
48
49
50 @SpringBootTest(classes = [CmNotificationSubscriptionDmiInEventConsumer])
51 @Testcontainers
52 @DirtiesContext
53 class CmNotificationSubscriptionDmiInEventConsumerSpec extends MessagingBaseSpec {
54     def objectMapper = new ObjectMapper()
55     def testTopic = 'dmi-ncmp-cm-avc-subscription'
56
57     @SpringBean
58     CmNotificationSubscriptionDmiInEventConsumer objectUnderTest = new CmNotificationSubscriptionDmiInEventConsumer(cloudEventKafkaTemplate)
59
60     def logger = Spy(ListAppender<ILoggingEvent>)
61
62     void setup() {
63         ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).addAppender(logger)
64         logger.start()
65     }
66
67     void cleanup() {
68         ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders()
69     }
70
71     def 'Sends subscription cloud event response successfully.'() {
72         given: 'an subscription event response'
73             objectUnderTest.dmiName = 'test-ncmp-dmi'
74             objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
75             def correlationId = 'test-subscriptionId#test-ncmp-dmi'
76             def cmSubscriptionDmiOutEventData = new Data(statusCode: subscriptionStatusCode, statusMessage: subscriptionStatusMessage)
77             def subscriptionEventResponse =
78                     new CmNotificationSubscriptionDmiOutEvent().withData(cmSubscriptionDmiOutEventData)
79         and: 'consumer has a subscription'
80             kafkaConsumer.subscribe([testTopic] as List<String>)
81         when: 'an event is published'
82             def eventKey = UUID.randomUUID().toString()
83             objectUnderTest.createAndSendCmNotificationSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus", correlationId, subscriptionAcceptanceType)
84         and: 'topic is polled'
85             def records = kafkaConsumer.poll(Duration.ofMillis(1500))
86         then: 'poll returns one record and close kafkaConsumer'
87             assert records.size() == 1
88             def record = records.iterator().next()
89             kafkaConsumer.close()
90         and: 'the record value matches the expected event value'
91             def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
92             assert expectedValue == record.value
93             assert eventKey == record.key
94         where: 'given #scenario'
95             scenario                   | subscriptionAcceptanceType                 | subscriptionStatusCode | subscriptionStatusMessage
96             'Subscription is Accepted' | CmNotificationSubscriptionStatus.ACCEPTED  | '1'                    | 'ACCEPTED'
97             'Subscription is Rejected' | CmNotificationSubscriptionStatus.REJECTED  | '2'                    | 'REJECTED'
98     }
99
100     def 'Consume valid message.'() {
101         given: 'an event'
102             objectUnderTest.dmiName = 'test-ncmp-dmi'
103             def eventKey = UUID.randomUUID().toString()
104             def timestamp = new Timestamp(1679521929511)
105             def jsonData = TestUtils.getResourceFileContent('cmNotificationSubscriptionCreationEvent.json')
106             def subscriptionEvent = objectMapper.readValue(jsonData, CmNotificationSubscriptionDmiInEvent.class)
107             objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
108             def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
109                     .withType(subscriptionType)
110                     .withDataSchema(URI.create("urn:cps:" + CmNotificationSubscriptionDmiInEvent.class.getName() + ":1.0.0"))
111                     .withExtension("correlationid", eventKey)
112                     .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
113                     .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
114             def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
115         when: 'the valid event is consumed'
116             objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
117         then: 'no exception is thrown'
118             noExceptionThrown()
119         where: 'given #scenario'
120             scenario                    | subscriptionType
121             'Subscription Create Event' | "subscriptionCreated"
122             'Subscription Delete Event' | "subscriptionDeleted"
123     }
124
125     def 'Consume invalid message.'() {
126         given: 'an invalid event body'
127             objectUnderTest.dmiName = 'test-ncmp-dmi'
128             def eventKey = UUID.randomUUID().toString()
129             def timestamp = new Timestamp(1679521929511)
130             def invalidJsonBody = "/////"
131             objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
132             def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
133                     .withType("subscriptionCreated")
134                     .withDataSchema(URI.create("urn:cps:org.onap.ncmp.dmi.cm.subscription:1.0.0"))
135                     .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
136                     .withExtension("correlationid", eventKey).withData(objectMapper.writeValueAsBytes(invalidJsonBody)).build()
137             def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
138         when: 'the invalid event is consumed'
139             objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
140         then: 'exception is thrown and event is logged'
141             def loggingEvent = getLoggingEvent()
142             assert loggingEvent.level == Level.ERROR
143             assert loggingEvent.formattedMessage.contains('Unable to map cloud event to target event class type')
144     }
145
146     def getLoggingEvent() {
147         return logger.list[0]
148     }
149 }