package org.onap.cps.ncmp.api.impl.async;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class NcmpAsyncDataOperationEventConsumer {
- private final EventsPublisher<DataOperationEvent> eventsPublisher;
+ private final EventsPublisher<CloudEvent> eventsPublisher;
/**
* Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
filter = "includeDataOperationEventsOnly",
groupId = "ncmp-data-operation-event-group",
properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
- public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent>
- dataOperationEventConsumerRecord) {
+ public void consumeAndPublish(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
- final String eventTarget = SerializationUtils
- .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value());
- final String eventId = SerializationUtils
- .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value());
- eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(),
- dataOperationEventConsumerRecord.value());
+ final String eventTarget = KafkaHeaders.getParsedKafkaHeader(
+ dataOperationEventConsumerRecord.headers(), "ce_destination");
+ final String eventId = KafkaHeaders.getParsedKafkaHeader(
+ dataOperationEventConsumerRecord.headers(), "ce_id");
+ eventsPublisher.publishCloudEvent(eventTarget, eventId, dataOperationEventConsumerRecord.value());
}
}