log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
final String eventTarget = KafkaHeaders.getParsedKafkaHeader(
dataOperationEventConsumerRecord.headers(), "ce_destination");
- final String eventId = KafkaHeaders.getParsedKafkaHeader(
- dataOperationEventConsumerRecord.headers(), "ce_id");
- eventsPublisher.publishCloudEvent(eventTarget, eventId, dataOperationEventConsumerRecord.value());
+ final String correlationId = KafkaHeaders.getParsedKafkaHeader(
+ dataOperationEventConsumerRecord.headers(), "ce_correlationid");
+ eventsPublisher.publishCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value());
}
}