X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Fevents%2Favc%2FAvcEventConsumer.java;h=3bf02f0b587ceb67ef329bb15a9733f795255035;hb=e626c9661fd88a585b50dafab5f5542784690143;hp=83ad5e5704e517727e335e9fefc88368ed7ce01f;hpb=36d6e83473846cd4e0ad5e42ad6a8d0e3e3b3425;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index 83ad5e570..3bf02f0b5 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -20,14 +20,19 @@ package org.onap.cps.ncmp.api.impl.events.avc; +import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.event.model.AvcEvent; +import org.onap.cps.ncmp.events.avc.v1.AvcEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.util.SerializationUtils; /** * Listener for AVC events. @@ -47,16 +52,28 @@ public class AvcEventConsumer { /** - * Consume the specified event. + * Incoming AvcEvent in the form of Consumer Record. * - * @param avcEvent the event to be consumed and produced. + * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener( - topics = "${app.dmi.cm-events.topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"}) - public void consumeAndForward(final AvcEvent avcEvent) { - log.debug("Consuming AVC event {} ...", avcEvent); - final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEvent); - eventsPublisher.publishEvent(cmEventsTopicName, outgoingAvcEvent.getEventId(), outgoingAvcEvent); + @KafkaListener(topics = "${app.dmi.cm-events.topic}", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"}) + public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { + log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); + final String mutatedEventId = UUID.randomUUID().toString(); + mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId); + final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value()); + eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(), + outgoingAvcEvent); + } + + private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { + final String existingEventId = + (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value()); + eventHeaders.remove("eventId"); + log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, + mutatedEventId); + eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId))); + } }