2 * ============LICENSE_START=======================================================
3 * Copyright (c) 2023-2024 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an 'AS IS' BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.api.impl.events.avc
23 import com.fasterxml.jackson.databind.ObjectMapper
24 import io.cloudevents.CloudEvent
25 import io.cloudevents.core.builder.CloudEventBuilder
26 import io.cloudevents.kafka.CloudEventDeserializer
27 import io.cloudevents.kafka.impl.KafkaHeaders
28 import org.apache.kafka.clients.consumer.ConsumerRecord
29 import org.apache.kafka.clients.consumer.KafkaConsumer
30 import org.onap.cps.events.EventsPublisher
31 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
32 import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
33 import org.onap.cps.ncmp.utils.TestUtils
34 import org.onap.cps.utils.JsonObjectMapper
35 import org.spockframework.spring.SpringBean
36 import org.springframework.beans.factory.annotation.Autowired
37 import org.springframework.boot.test.context.SpringBootTest
38 import org.springframework.test.annotation.DirtiesContext
39 import org.testcontainers.spock.Testcontainers
40 import java.time.Duration
42 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
44 @SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
47 class AvcEventConsumerSpec extends MessagingBaseSpec {
50 EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
53 AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher)
56 JsonObjectMapper jsonObjectMapper
58 def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer))
60 def 'Consume and forward valid message'() {
61 given: 'consumer has a subscription on a topic'
62 def cmEventsTopicName = 'cm-events'
63 acvEventConsumer.cmEventsTopicName = cmEventsTopicName
64 cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
65 and: 'an event is sent'
66 def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
67 def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
68 def testCloudEventSent = CloudEventBuilder.v1()
69 .withData(jsonObjectMapper.asJsonBytes(testEventSent))
70 .withId('sample-eventid')
71 .withType('sample-test-type')
72 .withSource(URI.create('sample-test-source'))
73 .withExtension('correlationid', 'test-cmhandle1').build()
74 and: 'event has header information'
75 def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent)
76 when: 'the event is consumed'
77 acvEventConsumer.consumeAndForward(consumerRecord)
78 and: 'the topic is polled'
79 def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
80 then: 'poll returns one record'
81 assert records.size() == 1
82 and: 'record can be converted to AVC event'
83 def record = records.iterator().next()
84 def cloudEvent = record.value() as CloudEvent
85 def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class)
86 and: 'we have correct headers forwarded where correlation id matches'
87 assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
88 and: 'event id differs(as per requirement) between consumed and forwarded'
89 assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid'
90 and: 'the event payload still matches'
91 assert testEventSent == convertedAvcEvent