22852bea43e3392113b5ead062b24d4b2738514b
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / events / avc / AvcEventConsumerSpec.groovy
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (c) 2023 Nordix Foundation.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.api.impl.events.avc
22
23 import com.fasterxml.jackson.databind.ObjectMapper
24 import io.cloudevents.CloudEvent
25 import io.cloudevents.core.CloudEventUtils
26 import io.cloudevents.core.builder.CloudEventBuilder
27 import io.cloudevents.jackson.PojoCloudEventDataMapper
28 import io.cloudevents.kafka.CloudEventDeserializer
29 import io.cloudevents.kafka.impl.KafkaHeaders
30 import org.apache.kafka.clients.consumer.ConsumerRecord
31 import org.apache.kafka.clients.consumer.KafkaConsumer
32 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
33 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
34 import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
35 import org.onap.cps.ncmp.utils.TestUtils
36 import org.onap.cps.utils.JsonObjectMapper
37 import org.spockframework.spring.SpringBean
38 import org.springframework.beans.factory.annotation.Autowired
39 import org.springframework.boot.test.context.SpringBootTest
40 import org.springframework.test.annotation.DirtiesContext
41 import org.testcontainers.spock.Testcontainers
42 import java.time.Duration
43
44 @SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
45 @Testcontainers
46 @DirtiesContext
47 class AvcEventConsumerSpec extends MessagingBaseSpec {
48
49     @SpringBean
50     EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
51
52     @SpringBean
53     AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher)
54
55     @Autowired
56     JsonObjectMapper jsonObjectMapper
57
58     def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer))
59
60     def 'Consume and forward valid message'() {
61         given: 'consumer has a subscription on a topic'
62             def cmEventsTopicName = 'cm-events'
63             acvEventConsumer.cmEventsTopicName = cmEventsTopicName
64             cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
65         and: 'an event is sent'
66             def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
67             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
68             def testCloudEventSent = CloudEventBuilder.v1()
69                 .withData(jsonObjectMapper.asJsonBytes(testEventSent))
70                 .withId('sample-eventid')
71                 .withType('sample-test-type')
72                 .withSource(URI.create('sample-test-source'))
73                 .withExtension('correlationid', 'test-cmhandle1').build()
74         and: 'event has header information'
75             def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent)
76         when: 'the event is consumed'
77             acvEventConsumer.consumeAndForward(consumerRecord)
78         and: 'the topic is polled'
79             def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
80         then: 'poll returns one record'
81             assert records.size() == 1
82         and: 'record can be converted to AVC event'
83             def record = records.iterator().next()
84             def cloudEvent = record.value() as CloudEvent
85             def convertedAvcEvent = CloudEventUtils.mapData(cloudEvent, PojoCloudEventDataMapper.from(new ObjectMapper(), AvcEvent.class)).getValue()
86         and: 'we have correct headers forwarded where correlation id matches'
87             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
88         and: 'event id differs(as per requirement) between consumed and forwarded'
89             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid'
90         and: 'the event payload still matches'
91             assert testEventSent == convertedAvcEvent
92     }
93
94 }