Merge "Cm Subscription: Predicates optional now"
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / events / avc / AvcEventConsumerSpec.groovy
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Copyright (c) 2023-2024 Nordix Foundation.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an 'AS IS' BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.api.impl.events.avc
22
23 import com.fasterxml.jackson.databind.ObjectMapper
24 import io.cloudevents.CloudEvent
25 import io.cloudevents.core.builder.CloudEventBuilder
26 import io.cloudevents.kafka.CloudEventDeserializer
27 import io.cloudevents.kafka.impl.KafkaHeaders
28 import org.apache.kafka.clients.consumer.ConsumerRecord
29 import org.apache.kafka.clients.consumer.KafkaConsumer
30 import org.onap.cps.events.EventsPublisher
31 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
32 import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
33 import org.onap.cps.ncmp.utils.TestUtils
34 import org.onap.cps.utils.JsonObjectMapper
35 import org.spockframework.spring.SpringBean
36 import org.springframework.beans.factory.annotation.Autowired
37 import org.springframework.boot.test.context.SpringBootTest
38 import org.springframework.test.annotation.DirtiesContext
39 import org.testcontainers.spock.Testcontainers
40 import java.time.Duration
41
42 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
43
44 @SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
45 @Testcontainers
46 @DirtiesContext
47 class AvcEventConsumerSpec extends MessagingBaseSpec {
48
49     @SpringBean
50     EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
51
52     @SpringBean
53     AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher)
54
55     @Autowired
56     JsonObjectMapper jsonObjectMapper
57
58     def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer))
59
60     def 'Consume and forward valid message'() {
61         given: 'consumer has a subscription on a topic'
62             def cmEventsTopicName = 'cm-events'
63             acvEventConsumer.cmEventsTopicName = cmEventsTopicName
64             cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
65         and: 'an event is sent'
66             def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
67             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
68             def testCloudEventSent = CloudEventBuilder.v1()
69                 .withData(jsonObjectMapper.asJsonBytes(testEventSent))
70                 .withId('sample-eventid')
71                 .withType('sample-test-type')
72                 .withSource(URI.create('sample-test-source'))
73                 .withExtension('correlationid', 'test-cmhandle1').build()
74         and: 'event has header information'
75             def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent)
76         when: 'the event is consumed'
77             acvEventConsumer.consumeAndForward(consumerRecord)
78         and: 'the topic is polled'
79             def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
80         then: 'poll returns one record'
81             assert records.size() == 1
82         and: 'record can be converted to AVC event'
83             def record = records.iterator().next()
84             def cloudEvent = record.value() as CloudEvent
85             def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class)
86         and: 'we have correct headers forwarded where correlation id matches'
87             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
88         and: 'event id differs(as per requirement) between consumed and forwarded'
89             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid'
90         and: 'the event payload still matches'
91             assert testEventSent == convertedAvcEvent
92     }
93
94 }