package org.onap.cps.ncmp.api.impl.events.avc;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeader;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.avc.v1.AvcEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
-import org.springframework.util.SerializationUtils;
/**
* Listener for AVC events.
@Value("${app.ncmp.avc.cm-events-topic}")
private String cmEventsTopicName;
- private final EventsPublisher<AvcEvent> eventsPublisher;
- private final AvcEventMapper avcEventMapper;
-
+ private final EventsPublisher<CloudEvent> eventsPublisher;
/**
* Incoming AvcEvent in the form of Consumer Record.
*
* @param avcEventConsumerRecord Incoming raw consumer record
*/
- @KafkaListener(topics = "${app.dmi.cm-events.topic}",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"})
- public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) {
+ @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+ public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
- final String mutatedEventId = UUID.randomUUID().toString();
- mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId);
- final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value());
- eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(),
- outgoingAvcEvent);
- }
-
- private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) {
- final String eventId = "eventId";
- final String existingEventId =
- (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value());
- eventHeaders.remove(eventId);
- log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId,
- mutatedEventId);
- eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId)));
-
+ final String newEventId = UUID.randomUUID().toString();
+ final CloudEvent outgoingAvcEvent =
+ CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+ eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
}
}