KafkaListener config for CloudEvents 54/135154/3
authormpriyank <priyank.maheshwari@est.tech>
Fri, 23 Jun 2023 14:01:38 +0000 (15:01 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Mon, 26 Jun 2023 11:53:00 +0000 (12:53 +0100)
- Introduced kafkalistener factory configs to correctly use the configs
  and convert the incoming events to CloudEvents
- Also legacy events to use a separate listener factory
- Increased the wait time from 100ms to 300ms to listen to the message
  in the test

Issue-ID: CPS-1764
Change-Id: I8b9fe0ba82ef87d52b7731941ccd3af8ae980109
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java [moved from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java with 81% similarity]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy [moved from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy with 96% similarity]

@@ -29,6 +29,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -45,7 +46,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 @Configuration
 @EnableKafka
 @RequiredArgsConstructor
-public class KafkaTemplateConfig<T> {
+public class KafkaConfig<T> {
 
     private final KafkaProperties kafkaProperties;
 
@@ -75,6 +76,32 @@ public class KafkaTemplateConfig<T> {
         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
     }
 
+    /**
+     * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
+     *
+     * @return an instance of legacy Kafka template.
+     */
+    @Bean
+    @Primary
+    public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
+        final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
+        kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
+        return kafkaTemplate;
+    }
+
+    /**
+     * A legacy concurrent kafka listener container factory.
+     *
+     * @return instance of Concurrent kafka listener factory
+     */
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
+        final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory =
+                new ConcurrentKafkaListenerContainerFactory<>();
+        containerFactory.setConsumerFactory(legacyEventConsumerFactory());
+        return containerFactory;
+    }
+
     /**
      * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
      * application.yml with CloudEventSerializer.
@@ -99,18 +126,6 @@ public class KafkaTemplateConfig<T> {
         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
     }
 
-    /**
-     * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
-     *
-     * @return an instance of legacy Kafka template.
-     */
-    @Bean
-    @Primary
-    public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
-        final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
-        kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
-        return kafkaTemplate;
-    }
 
     /**
      * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
@@ -124,4 +139,18 @@ public class KafkaTemplateConfig<T> {
         return kafkaTemplate;
     }
 
+    /**
+     * A Concurrent CloudEvent kafka listener container factory.
+     *
+     * @return instance of Concurrent kafka listener factory
+     */
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
+                                        cloudEventConcurrentKafkaListenerContainerFactory() {
+        final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
+                new ConcurrentKafkaListenerContainerFactory<>();
+        containerFactory.setConsumerFactory(cloudEventConsumerFactory());
+        return containerFactory;
+    }
+
 }
index b5ca176..88ebd35 100644 (file)
@@ -52,7 +52,8 @@ public class AvcEventConsumer {
      *
      * @param avcEventConsumerRecord Incoming raw consumer record
      */
-    @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+    @KafkaListener(topics = "${app.dmi.cm-events.topic}",
+            containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
     public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
         log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
         final String newEventId = UUID.randomUUID().toString();
index c0bdf3d..f577f55 100644 (file)
@@ -69,7 +69,7 @@ class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSp
             KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer))
             producer.send(record)
         and: 'wait a little for async processing of message'
-            TimeUnit.MILLISECONDS.sleep(100)
+            TimeUnit.MILLISECONDS.sleep(300)
         then: 'the event has only been forwarded for the correct type'
             expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(*_)
         where: 'the following event types are used'
@@ -85,7 +85,7 @@ class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSp
             KafkaProducer<String, String> producer = new KafkaProducer<>(eventProducerConfigProperties(StringSerializer))
             producer.send(record)
         and: 'wait a little for async processing of message'
-            TimeUnit.MILLISECONDS.sleep(100)
+            TimeUnit.MILLISECONDS.sleep(300)
         then: 'the event is not processed by this consumer'
             0 * mockEventsPublisher.publishCloudEvent(*_)
     }
@@ -34,10 +34,10 @@ import org.springframework.kafka.support.serializer.JsonSerializer
 import spock.lang.Shared
 import spock.lang.Specification
 
-@SpringBootTest(classes = [KafkaProperties, KafkaTemplateConfig])
+@SpringBootTest(classes = [KafkaProperties, KafkaConfig])
 @EnableSharedInjection
 @EnableConfigurationProperties
-class KafkaTemplateConfigSpec extends Specification {
+class KafkaConfigSpec extends Specification {
 
     @Shared
     @Autowired