From: mpriyank Date: Fri, 23 Jun 2023 14:01:38 +0000 (+0100) Subject: KafkaListener config for CloudEvents X-Git-Tag: 3.3.3~15^2 X-Git-Url: https://gerrit.onap.org/r/gitweb?p=cps.git;a=commitdiff_plain;h=93fa06429f8d8e3d5bce25ae4b338156f428888a KafkaListener config for CloudEvents - Introduced kafkalistener factory configs to correctly use the configs and convert the incoming events to CloudEvents - Also legacy events to use a separate listener factory - Increased the wait time from 100ms to 300ms to listen to the message in the test Issue-ID: CPS-1764 Change-Id: I8b9fe0ba82ef87d52b7731941ccd3af8ae980109 Signed-off-by: mpriyank --- diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java similarity index 81% rename from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java rename to cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java index b76f86ebe..514967574 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java @@ -29,6 +29,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -45,7 +46,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer; @Configuration @EnableKafka @RequiredArgsConstructor -public class KafkaTemplateConfig { +public class KafkaConfig { private final KafkaProperties kafkaProperties; @@ -75,6 +76,32 @@ public class KafkaTemplateConfig { return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); } + /** + * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this. + * + * @return an instance of legacy Kafka template. + */ + @Bean + @Primary + public KafkaTemplate legacyEventKafkaTemplate() { + final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); + kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory()); + return kafkaTemplate; + } + + /** + * A legacy concurrent kafka listener container factory. + * + * @return instance of Concurrent kafka listener factory + */ + @Bean + public ConcurrentKafkaListenerContainerFactory legacyEventConcurrentKafkaListenerContainerFactory() { + final ConcurrentKafkaListenerContainerFactory containerFactory = + new ConcurrentKafkaListenerContainerFactory<>(); + containerFactory.setConsumerFactory(legacyEventConsumerFactory()); + return containerFactory; + } + /** * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into * application.yml with CloudEventSerializer. @@ -99,18 +126,6 @@ public class KafkaTemplateConfig { return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); } - /** - * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this. - * - * @return an instance of legacy Kafka template. - */ - @Bean - @Primary - public KafkaTemplate legacyEventKafkaTemplate() { - final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); - kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory()); - return kafkaTemplate; - } /** * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this. @@ -124,4 +139,18 @@ public class KafkaTemplateConfig { return kafkaTemplate; } + /** + * A Concurrent CloudEvent kafka listener container factory. + * + * @return instance of Concurrent kafka listener factory + */ + @Bean + public ConcurrentKafkaListenerContainerFactory + cloudEventConcurrentKafkaListenerContainerFactory() { + final ConcurrentKafkaListenerContainerFactory containerFactory = + new ConcurrentKafkaListenerContainerFactory<>(); + containerFactory.setConsumerFactory(cloudEventConsumerFactory()); + return containerFactory; + } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index b5ca176d1..88ebd35c8 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -52,7 +52,8 @@ public class AvcEventConsumer { * * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener(topics = "${app.dmi.cm-events.topic}") + @KafkaListener(topics = "${app.dmi.cm-events.topic}", + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); final String newEventId = UUID.randomUUID().toString(); diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy index c0bdf3d1d..f577f55ba 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy @@ -69,7 +69,7 @@ class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSp KafkaProducer producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer)) producer.send(record) and: 'wait a little for async processing of message' - TimeUnit.MILLISECONDS.sleep(100) + TimeUnit.MILLISECONDS.sleep(300) then: 'the event has only been forwarded for the correct type' expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(*_) where: 'the following event types are used' @@ -85,7 +85,7 @@ class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSp KafkaProducer producer = new KafkaProducer<>(eventProducerConfigProperties(StringSerializer)) producer.send(record) and: 'wait a little for async processing of message' - TimeUnit.MILLISECONDS.sleep(100) + TimeUnit.MILLISECONDS.sleep(300) then: 'the event is not processed by this consumer' 0 * mockEventsPublisher.publishCloudEvent(*_) } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy similarity index 96% rename from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy rename to cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy index ed5f16125..d5b091552 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy @@ -34,10 +34,10 @@ import org.springframework.kafka.support.serializer.JsonSerializer import spock.lang.Shared import spock.lang.Specification -@SpringBootTest(classes = [KafkaProperties, KafkaTemplateConfig]) +@SpringBootTest(classes = [KafkaProperties, KafkaConfig]) @EnableSharedInjection @EnableConfigurationProperties -class KafkaTemplateConfigSpec extends Specification { +class KafkaConfigSpec extends Specification { @Shared @Autowired