2  *  ============LICENSE_START=======================================================
 
   3  *  Copyright (c) 2023-2024 Nordix Foundation.
 
   4  *  ================================================================================
 
   5  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   6  *  you may not use this file except in compliance with the License.
 
   7  *  You may obtain a copy of the License at
 
   9  *        http://www.apache.org/licenses/LICENSE-2.0
 
  11  *  Unless required by applicable law or agreed to in writing, software
 
  12  *  distributed under the License is distributed on an 'AS IS' BASIS,
 
  13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  *  See the License for the specific language governing permissions and
 
  15  *  limitations under the License.
 
  17  *  SPDX-License-Identifier: Apache-2.0
 
  18  *  ============LICENSE_END=========================================================
 
  21 package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc
 
  23 import com.fasterxml.jackson.databind.ObjectMapper
 
  24 import io.cloudevents.CloudEvent
 
  25 import io.cloudevents.core.builder.CloudEventBuilder
 
  26 import io.cloudevents.kafka.CloudEventDeserializer
 
  27 import io.cloudevents.kafka.impl.KafkaHeaders
 
  28 import org.apache.kafka.clients.consumer.ConsumerRecord
 
  29 import org.apache.kafka.clients.consumer.KafkaConsumer
 
  30 import org.onap.cps.events.EventsPublisher
 
  31 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 
  32 import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
 
  33 import org.onap.cps.ncmp.utils.TestUtils
 
  34 import org.onap.cps.utils.JsonObjectMapper
 
  35 import org.spockframework.spring.SpringBean
 
  36 import org.springframework.beans.factory.annotation.Autowired
 
  37 import org.springframework.boot.test.context.SpringBootTest
 
  38 import org.springframework.test.annotation.DirtiesContext
 
  39 import org.testcontainers.spock.Testcontainers
 
  41 import java.time.Duration
 
  43 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
  45 @SpringBootTest(classes = [EventsPublisher, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
 
  48 class CmAvcEventConsumerSpec extends MessagingBaseSpec {
 
  51     EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
  54     CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsPublisher)
 
  57     JsonObjectMapper jsonObjectMapper
 
  59     def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer))
 
  61     def 'Consume and forward valid message'() {
 
  62         given: 'consumer has a subscription on a topic'
 
  63             def cmEventsTopicName = 'cm-events'
 
  64             acvEventConsumer.cmEventsTopicName = cmEventsTopicName
 
  65             cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
 
  66         and: 'an event is sent'
 
  67             def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
 
  68             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
 
  69             def testCloudEventSent = CloudEventBuilder.v1()
 
  70                 .withData(jsonObjectMapper.asJsonBytes(testEventSent))
 
  71                 .withId('sample-eventid')
 
  72                 .withType('sample-test-type')
 
  73                 .withSource(URI.create('sample-test-source'))
 
  74                 .withExtension('correlationid', 'test-cmhandle1').build()
 
  75         and: 'event has header information'
 
  76             def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent)
 
  77         when: 'the event is consumed'
 
  78             acvEventConsumer.consumeAndForward(consumerRecord)
 
  79         and: 'the topic is polled'
 
  80             def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
 
  81         then: 'poll returns one record'
 
  82             assert records.size() == 1
 
  83         and: 'record can be converted to AVC event'
 
  84             def record = records.iterator().next()
 
  85             def cloudEvent = record.value() as CloudEvent
 
  86             def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class)
 
  87         and: 'we have correct headers forwarded where correlation id matches'
 
  88             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
 
  89         and: 'event id differs(as per requirement) between consumed and forwarded'
 
  90             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid'
 
  91         and: 'the event payload still matches'
 
  92             assert testEventSent == convertedAvcEvent