- @KafkaListener(
- topics = "${app.dmi.cm-events.topic}",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"})
- public void consumeAndForward(final AvcEvent avcEvent) {
- log.debug("Consuming AVC event {} ...", avcEvent);
- final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEvent);
- eventsPublisher.publishEvent(cmEventsTopicName, outgoingAvcEvent.getEventId(), outgoingAvcEvent);
+ @KafkaListener(topics = "${app.dmi.cm-events.topic}",
+ properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"})
+ public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) {
+ log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
+ final String mutatedEventId = UUID.randomUUID().toString();
+ mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId);
+ final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value());
+ eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(),
+ outgoingAvcEvent);
+ }
+
+ private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) {
+ final String existingEventId =
+ (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value());
+ eventHeaders.remove("eventId");
+ log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId,
+ mutatedEventId);
+ eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId)));
+