X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Fasync%2FNcmpAsyncDataOperationEventConsumer.java;fp=cps-ncmp-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fcps%2Fncmp%2Fapi%2Fimpl%2Fasync%2FNcmpAsyncBatchEventConsumer.java;h=995a4d5a6743f1c2a74bbc5a4843e5e2fd35772d;hb=f4c3f0fcebec726ea74b44f9bca3b68e66176671;hp=2a332d0037e9e35ae97b08da6065007b0ac12ef6;hpb=08898672e0de04238acdedea4266c58f17c2b7e0;p=cps.git diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java similarity index 60% rename from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java rename to cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java index 2a332d003..995a4d5a6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java @@ -25,40 +25,41 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** - * Listener for cps-ncmp async batch events. + * Listener for cps-ncmp async data operation events. */ @Component @Slf4j @RequiredArgsConstructor @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class NcmpAsyncBatchEventConsumer { +public class NcmpAsyncDataOperationEventConsumer { - private final EventsPublisher eventsPublisher; + private final EventsPublisher eventsPublisher; /** - * Consume the BatchDataResponseEvent published by producer to topic 'async-m2m.topic' + * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic' * and publish the same to the client specified topic. * - * @param batchEventConsumerRecord consuming event as a ConsumerRecord. + * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord. */ @KafkaListener( topics = "${app.ncmp.async-m2m.topic}", - filter = "filterBatchDataResponseEvent", - groupId = "ncmp-batch-event-group", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async.BatchDataResponseEventV1"}) - public void consumeAndPublish(final ConsumerRecord batchEventConsumerRecord) { - log.info("Consuming event payload {} ...", batchEventConsumerRecord.value()); + filter = "includeDataOperationEventsOnly", + groupId = "ncmp-data-operation-event-group", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"}) + public void consumeAndPublish(final ConsumerRecord + dataOperationEventConsumerRecord) { + log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value()); final String eventTarget = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventTarget").value()); + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value()); final String eventId = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventId").value()); - eventsPublisher.publishEvent(eventTarget, eventId, batchEventConsumerRecord.headers(), - batchEventConsumerRecord.value()); + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value()); + eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(), + dataOperationEventConsumerRecord.value()); } }