DMI Data AVC to cloud events
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / events / avc / AvcEventConsumerSpec.groovy
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (c) 2023 Nordix Foundation.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.api.impl.events.avc
22
23 import com.fasterxml.jackson.databind.ObjectMapper
24 import io.cloudevents.CloudEvent
25 import io.cloudevents.core.CloudEventUtils
26 import io.cloudevents.core.builder.CloudEventBuilder
27 import io.cloudevents.jackson.PojoCloudEventDataMapper
28 import io.cloudevents.kafka.CloudEventDeserializer
29 import io.cloudevents.kafka.impl.KafkaHeaders
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.apache.kafka.clients.consumer.KafkaConsumer
32 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
33 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
34 import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
35 import org.onap.cps.ncmp.utils.TestUtils
36 import org.onap.cps.utils.JsonObjectMapper
37 import org.spockframework.spring.SpringBean
38 import org.springframework.beans.factory.annotation.Autowired
39 import org.springframework.boot.test.context.SpringBootTest
40 import org.springframework.test.annotation.DirtiesContext
41 import org.testcontainers.spock.Testcontainers
42
43 import java.time.Duration
44
45 @SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
46 @Testcontainers
47 @DirtiesContext
48 class AvcEventConsumerSpec extends MessagingBaseSpec {
49
50     @SpringBean
51     EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
52
53     @SpringBean
54     AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher)
55
56     @Autowired
57     JsonObjectMapper jsonObjectMapper
58
59     @Autowired
60     ObjectMapper objectMapper
61
62     def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer))
63
64     def 'Consume and forward valid message'() {
65         given: 'consumer has a subscription on a topic'
66             def cmEventsTopicName = 'cm-events'
67             acvEventConsumer.cmEventsTopicName = cmEventsTopicName
68             cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
69         and: 'an event is sent'
70             def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
71             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
72             def testCloudEventSent = CloudEventBuilder.v1()
73                 .withData(objectMapper.writeValueAsBytes(testEventSent))
74                 .withId('sample-eventid')
75                 .withType('sample-test-type')
76                 .withSource(URI.create('sample-test-source'))
77                 .withExtension('correlationid', 'test-cmhandle1').build()
78         and: 'event has header information'
79             def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent)
80         when: 'the event is consumed'
81             acvEventConsumer.consumeAndForward(consumerRecord)
82         and: 'the topic is polled'
83             def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
84         then: 'poll returns one record'
85             assert records.size() == 1
86         and: 'record can be converted to AVC event'
87             def record = records.iterator().next()
88             def cloudevent = record.value() as CloudEvent
89             def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
90         and: 'we have correct headers forwarded where correlation id matches'
91             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
92         and: 'event id differs(as per requirement) between consumed and forwarded'
93             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid'
94         and: 'the event payload still matches'
95             assert testEventSent == convertedAvcEvent
96     }
97
98 }