package org.onap.cps.ncmp.api.impl.events.avc;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.event.model.AvcEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
@Value("${app.ncmp.avc.cm-events-topic}")
private String cmEventsTopicName;
- private final EventsPublisher<AvcEvent> eventsPublisher;
- private final AvcEventMapper avcEventMapper;
-
+ private final EventsPublisher<CloudEvent> eventsPublisher;
/**
- * Consume the specified event.
+ * Incoming AvcEvent in the form of Consumer Record.
*
- * @param avcEvent the event to be consumed and produced.
+ * @param avcEventConsumerRecord Incoming raw consumer record
*/
- @KafkaListener(
- topics = "${app.dmi.cm-events.topic}",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"})
- public void consumeAndForward(final AvcEvent avcEvent) {
- log.debug("Consuming AVC event {} ...", avcEvent);
- final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEvent);
- eventsPublisher.publishEvent(cmEventsTopicName, outgoingAvcEvent.getEventId(), outgoingAvcEvent);
+ @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+ public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
+ log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
+ final String newEventId = UUID.randomUUID().toString();
+ final CloudEvent outgoingAvcEvent =
+ CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+ eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
}
}