Updating the Kafka listener compliance to could events and legacy
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / async / DataOperationEventConsumer.java
@@ -37,13 +37,13 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @RequiredArgsConstructor
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class NcmpAsyncDataOperationEventConsumer {
+public class DataOperationEventConsumer {
 
     private final EventsPublisher<CloudEvent> eventsPublisher;
 
     /**
-     * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
-     * and publish the same to the client specified topic.
+     * Consume the DataOperation cloud event published by producer to topic 'async-m2m.topic'
+     * and publish the same to client specified topic.
      *
      * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord.
      */
@@ -51,7 +51,7 @@ public class NcmpAsyncDataOperationEventConsumer {
             topics = "${app.ncmp.async-m2m.topic}",
             filter = "includeDataOperationEventsOnly",
             groupId = "ncmp-data-operation-event-group",
-            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
+            containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
     public void consumeAndPublish(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
         log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
         final String eventTarget = KafkaHeaders.getParsedKafkaHeader(