/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.CloudEventSerializer
import io.cloudevents.kafka.impl.KafkaHeaders
-import io.cloudevents.core.CloudEventUtils
import io.cloudevents.core.builder.CloudEventBuilder
-import io.cloudevents.jackson.PojoCloudEventDataMapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.header.internals.RecordHeaders
-import org.onap.cps.ncmp.api.impl.events.EventsPublisher
+import org.onap.cps.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.testcontainers.spock.Testcontainers
import java.time.Duration
-@SpringBootTest(classes = [EventsPublisher, DataOperationEventConsumer, RecordFilterStrategies,JsonObjectMapper, ObjectMapper])
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
+
+@SpringBootTest(classes = [EventsPublisher, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper])
@Testcontainers
@DirtiesContext
class DataOperationEventConsumerSpec extends MessagingBaseSpec {
@Autowired
RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy
- @Autowired
- ObjectMapper objectMapper
-
def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
def static clientTopic = 'client-topic'
def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
and: 'verify that extension is included into header'
assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
and: 'map consumer record to expected event type'
- def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
- PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
+ def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
and: 'verify published response data properties'
def response = dataOperationResponseEvent.data.responses[0]
response.operationId == 'some-operation-id'