Implement Exactly Once Semantics for CmAvcEvents flow 83/142183/9
authormpriyank <priyank.maheshwari@est.tech>
Wed, 24 Sep 2025 15:45:13 +0000 (16:45 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Wed, 29 Oct 2025 06:03:44 +0000 (06:03 +0000)
- introduced exactly once processing for the Cm Avc Event forwarding
  flow
- corresponding unit test and integration test added

Issue-ID: CPS-2984
Change-Id: I4eea7c15fe9c2ebbdb441bff669ced365d17510d
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
23 files changed:
cps-application/src/main/resources/application.yml
cps-application/src/test/resources/application.yml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/MessagingBaseSpec.groovy
cps-service/src/main/java/org/onap/cps/events/EventsProducer.java
cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy
docker-compose/docker-compose.yml
integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/data/LegacyBatchDataOperationSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleUpdateSpec.groovy
integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java
integration-test/src/test/resources/application.yml

index 90e5ac1..bfc5339 100644 (file)
@@ -240,6 +240,10 @@ ncmp:
     model-loader:
         maximum-attempt-count: 20
 
+    notifications:
+        avc-event-producer:
+            transaction-id-prefix: tx-
+
 # Custom Hazelcast Config.
 hazelcast:
     cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"}
index 0bb43b0..107facb 100644 (file)
@@ -237,6 +237,10 @@ ncmp:
     model-loader:
         maximum-attempt-count: 20
 
+    notifications:
+        avc-event-producer:
+            transaction-id-prefix: test-tx-
+
 # Custom Hazelcast Config.
 hazelcast:
     cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java
new file mode 100644 (file)
index 0000000..33dfada
--- /dev/null
@@ -0,0 +1,165 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.config;
+
+import io.cloudevents.CloudEvent;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
+import java.time.Duration;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.ssl.SslBundles;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.KafkaException;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
+import org.springframework.kafka.transaction.KafkaTransactionManager;
+
+/**
+ * kafka Configuration for implementing Exactly Once Semantics using cloud events.
+ */
+@Configuration
+@EnableKafka
+@RequiredArgsConstructor
+public class ExactlyOnceSemanticsKafkaConfig {
+
+    private final KafkaProperties kafkaProperties;
+
+    @Value("${cps.tracing.enabled:false}")
+    private boolean tracingEnabled;
+
+    @Value("${ncmp.notifications.avc-event-producer.transaction-id-prefix:tx-}")
+    private String transactionIdPrefixForExactlyOnceSemantics;
+
+    private static final SslBundles NO_SSL = null;
+
+
+    /**
+     * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
+     * application.yml with CloudEventSerializer.This factory is configured to support
+     * exactly-once semantics by enabling idempotence and setting a transaction ID prefix.
+     *
+     * @return cloud event producer instance.
+     */
+    @Bean
+    public ProducerFactory<String, CloudEvent> cloudEventProducerFactoryForEos() {
+        final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
+        producerConfigProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        producerConfigProperties.put(ProducerConfig.ACKS_CONFIG, "all");
+        if (tracingEnabled) {
+            producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    TracingProducerInterceptor.class.getName());
+        }
+        final DefaultKafkaProducerFactory<String, CloudEvent> defaultKafkaProducerFactory =
+                new DefaultKafkaProducerFactory<>(producerConfigProperties);
+        defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefixForExactlyOnceSemantics);
+        return defaultKafkaProducerFactory;
+    }
+
+    /**
+     * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
+     * into application.yml having CloudEventDeserializer as deserializer-value.This factory is configured with
+     * read_committed isolation level to support exactly-once semantics.
+     *
+     * @return an instance of cloud consumer factory.
+     */
+    @Bean
+    public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactoryForEos() {
+        final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
+        consumerConfigProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        if (tracingEnabled) {
+            consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    TracingConsumerInterceptor.class.getName());
+        }
+        return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+    }
+
+
+    /**
+     * A cloud Kafka event template for executing high-level operations. The template is configured using the cloud
+     * event producer and consumer factories to support
+     * exactly-once semantics.
+     *
+     * @return an instance of cloud Kafka template.
+     */
+    @Bean(name = "cloudEventKafkaTemplateForEos")
+    public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos() {
+        final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactoryForEos());
+        kafkaTemplate.setConsumerFactory(cloudEventConsumerFactoryForEos());
+        if (tracingEnabled) {
+            kafkaTemplate.setObservationEnabled(true);
+        }
+        return kafkaTemplate;
+    }
+
+    /**
+     * A Concurrent CloudEvent kafka listener container factory.
+     * This factory supports exactly-once semantics, retry handling, and optional tracing.
+     *
+     * @return instance of Concurrent kafka listener factory
+     */
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
+                            cloudEventConcurrentKafkaListenerContainerFactoryForEos() {
+        final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
+                new ConcurrentKafkaListenerContainerFactory<>();
+        containerFactory.setConsumerFactory(cloudEventConsumerFactoryForEos());
+        containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
+        containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
+        containerFactory.getContainerProperties().setKafkaAwareTransactionManager(kafkaTransactionManagerForEos());
+        containerFactory.setCommonErrorHandler(kafkaErrorHandlerWithMaxRetriesForEos());
+        if (tracingEnabled) {
+            containerFactory.getContainerProperties().setObservationEnabled(true);
+        }
+
+        return containerFactory;
+    }
+
+    private KafkaTransactionManager<String, CloudEvent> kafkaTransactionManagerForEos() {
+        return new KafkaTransactionManager<>(cloudEventProducerFactoryForEos());
+    }
+
+    private DefaultErrorHandler kafkaErrorHandlerWithMaxRetriesForEos() {
+
+        final ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries =
+                new ExponentialBackOffWithMaxRetries(Integer.MAX_VALUE);
+        exponentialBackOffWithMaxRetries.setInitialInterval(1000L); // 1 sec
+        exponentialBackOffWithMaxRetries.setMultiplier(2.0);
+        exponentialBackOffWithMaxRetries.setMaxInterval(30_000L); // 30 sec
+        final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(exponentialBackOffWithMaxRetries);
+        defaultErrorHandler.addRetryableExceptions(KafkaException.class);
+
+        return defaultErrorHandler;
+    }
+}
index be70833..6ff9dcb 100644 (file)
@@ -99,8 +99,7 @@ public class KafkaConfig {
      *
      * @return an instance of legacy Kafka template.
      */
-    @Bean
-    @Primary
+    @Bean(name = "legacyEventKafkaTemplate")
     public KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate() {
         final KafkaTemplate<String, LegacyEvent> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
         kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
@@ -166,7 +165,8 @@ public class KafkaConfig {
      *
      * @return an instance of cloud Kafka template.
      */
-    @Bean
+    @Primary
+    @Bean(name = "cloudEventKafkaTemplate")
     public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
         final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
         kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
index 0eb6573..9589606 100644 (file)
@@ -34,8 +34,10 @@ import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
 import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.KafkaException;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 /**
  * Listener for AVC events based on CM Subscriptions.
@@ -64,8 +66,9 @@ public class CmAvcEventConsumer {
      *
      * @param cmAvcEventAsConsumerRecord Incoming raw consumer record
      */
+    @Transactional
     @KafkaListener(topics = "${app.dmi.cm-events.topic}",
-        containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+        containerFactory = "cloudEventConcurrentKafkaListenerContainerFactoryForEos")
     @Timed(value = "cps.ncmp.cm.notifications.consume.and.forward", description = "Time taken to forward CM AVC events")
     public void consumeAndForward(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
         if (isEventFromOnapDmiPlugin(cmAvcEventAsConsumerRecord.headers())) {
@@ -73,8 +76,14 @@ public class CmAvcEventConsumer {
         }
         final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
         final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
-        log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
-        eventsProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
+
+        // Only for testing/demo
+        if (outgoingAvcEventKey.equals("retry")) {
+            throw new KafkaException("test kafka exception for testing");
+        }
+
+        log.info("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
+        eventsProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
     }
 
     private void processCmAvcEventChanges(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
index b2e42ce..6ffe000 100644 (file)
@@ -26,6 +26,7 @@ import io.cloudevents.kafka.CloudEventSerializer
 import org.onap.cps.events.LegacyEvent
 import org.spockframework.spring.EnableSharedInjection
 import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Qualifier
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties
 import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.boot.test.context.SpringBootTest
@@ -36,7 +37,7 @@ import org.springframework.test.context.TestPropertySource
 import spock.lang.Shared
 import spock.lang.Specification
 
-@SpringBootTest(classes = [KafkaProperties, KafkaConfig])
+@SpringBootTest(classes = [KafkaProperties, KafkaConfig, ExactlyOnceSemanticsKafkaConfig])
 @EnableSharedInjection
 @EnableConfigurationProperties
 @TestPropertySource(properties = ["cps.tracing.enabled=true"])
@@ -44,12 +45,20 @@ class KafkaConfigSpec extends Specification {
 
     @Shared
     @Autowired
+    @Qualifier("legacyEventKafkaTemplate")
     KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate
 
     @Shared
     @Autowired
+    @Qualifier("cloudEventKafkaTemplate")
     KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate
 
+    @Shared
+    @Autowired
+    @Qualifier("cloudEventKafkaTemplateForEos")
+    KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos
+
+
     def 'Verify kafka template serializer and deserializer configuration for #eventType.'() {
         expect: 'kafka template is instantiated'
             assert kafkaTemplateInstance.properties['beanName'] == beanName
@@ -58,8 +67,9 @@ class KafkaConfigSpec extends Specification {
         and: 'verify event key and value deserializer'
             assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName())
         where: 'the following event type is used'
-            eventType      | kafkaTemplateInstance    || beanName                   | valueSerializer      | delegateDeserializer
-            'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer       | JsonDeserializer
-            'cloud event'  | cloudEventKafkaTemplate  || 'cloudEventKafkaTemplate'  | CloudEventSerializer | CloudEventDeserializer
+            eventType                   | kafkaTemplateInstance         || beanName                        | valueSerializer      | delegateDeserializer
+            'legacy event'              | legacyEventKafkaTemplate      || 'legacyEventKafkaTemplate'      | JsonSerializer       | JsonDeserializer
+            'cloud event'               | cloudEventKafkaTemplate       || 'cloudEventKafkaTemplate'       | CloudEventSerializer | CloudEventDeserializer
+            'transactional cloud event' | cloudEventKafkaTemplateForEos || 'cloudEventKafkaTemplateForEos' | CloudEventSerializer | CloudEventDeserializer
     }
 }
index fec4fba..906779c 100644 (file)
@@ -44,7 +44,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase
 
     @SpringBean
     EventsProducer cpsAsyncRequestResponseEventProducer =
-        new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
+        new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
 
 
     @SpringBean
index 7b7faf3..420da6f 100644 (file)
@@ -51,7 +51,7 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 class DataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
 
     @SpringBean
     DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer)
index a42bf1f..b55959d 100644 (file)
@@ -56,7 +56,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
     JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
 
     @SpringBean
-    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
 
     def 'Process per data operation request with #serviceName.'() {
         given: 'data operation request with 3 operations'
index 5a0980c..82d979e 100644 (file)
@@ -51,7 +51,7 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 class CmAvcEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
 
     def mockCmAvcEventService = Mock(CmAvcEventService)
     def mockInventoryPersistence = Mock(InventoryPersistence)
index ed984ec..5d974fe 100644 (file)
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.LegacyEvent
 import org.onap.cps.ncmp.events.lcm.v1.Event
 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
 import org.onap.cps.ncmp.utils.TestUtils
@@ -43,12 +44,12 @@ import java.time.Duration
 @DirtiesContext
 class EventsProducerSpec extends MessagingBaseSpec {
 
-    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
+    def legacyEventKafkaConsumer = new KafkaConsumer<String, LegacyEvent>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
 
     def testTopic = 'ncmp-events-test'
 
     @SpringBean
-    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
index 21fc656..1ee936b 100644 (file)
@@ -32,7 +32,7 @@ import org.springframework.test.context.ContextConfiguration
 @ContextConfiguration(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper])
 class InventoryEventProducerSpec extends MessagingBaseSpec {
 
-    def mockEventsProducer = Mock(EventsProducer<CloudEvent>)
+    def mockEventsProducer = Mock(EventsProducer)
     def objectUnderTest = new InventoryEventProducer(mockEventsProducer)
 
     def 'Send an attribute value change event'() {
index ab6c3fd..58e3394 100644 (file)
@@ -24,6 +24,7 @@ import io.cloudevents.CloudEvent
 import io.cloudevents.kafka.CloudEventSerializer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.cps.events.LegacyEvent
 import org.springframework.kafka.core.DefaultKafkaProducerFactory
 import org.springframework.kafka.core.KafkaTemplate
 import org.springframework.kafka.support.serializer.JsonSerializer
@@ -44,9 +45,11 @@ class MessagingBaseSpec extends Specification {
 
     static kafkaTestContainer = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0")
 
-    def legacyEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, ?>(eventProducerConfigProperties(JsonSerializer)))
+    def legacyEventKafkaTemplate = new KafkaTemplate<String, LegacyEvent>(new DefaultKafkaProducerFactory<String, LegacyEvent>(eventProducerConfigProperties(JsonSerializer)))
 
-    def cloudEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
+    def cloudEventKafkaTemplate = new KafkaTemplate<String, CloudEvent>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
+
+    def cloudEventKafkaTemplateForEos = new KafkaTemplate<String, CloudEvent>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
 
     @DynamicPropertySource
     static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
index 7d28dc6..61758a0 100644 (file)
@@ -28,6 +28,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.kafka.KafkaException;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Service;
@@ -47,10 +49,15 @@ public class EventsProducer {
      * Note: Cloud events should be used. This will be addressed as part of  <a
      * href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
      */
-    private final KafkaTemplate<String, LegacyEvent> legacyKafkaEventTemplate;
+    @Qualifier("legacyEventKafkaTemplate")
+    private final KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate;
 
+    @Qualifier("cloudEventKafkaTemplate")
     private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
 
+    @Qualifier("cloudEventKafkaTemplateForEos")
+    private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos;
+
     /**
      * Generic CloudEvent sender.
      *
@@ -61,7 +68,7 @@ public class EventsProducer {
     public void sendCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
         final CompletableFuture<SendResult<String, CloudEvent>> eventFuture =
                 cloudEventKafkaTemplate.send(topicName, eventKey, event);
-        eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e));
+        eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false));
     }
 
     /**
@@ -75,7 +82,7 @@ public class EventsProducer {
      */
     public void sendLegacyEvent(final String topicName, final String eventKey, final LegacyEvent event) {
         final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture =
-                legacyKafkaEventTemplate.send(topicName, eventKey, event);
+                legacyEventKafkaTemplate.send(topicName, eventKey, event);
         handleLegacyEventCallback(topicName, eventFuture);
     }
 
@@ -92,7 +99,7 @@ public class EventsProducer {
         final ProducerRecord<String, LegacyEvent> producerRecord =
                 new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
         final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture =
-                legacyKafkaEventTemplate.send(producerRecord);
+                legacyEventKafkaTemplate.send(producerRecord);
         handleLegacyEventCallback(topicName, eventFuture);
     }
 
@@ -109,9 +116,23 @@ public class EventsProducer {
         sendLegacyEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
     }
 
+    /**
+     * Generic CloudEvent sender ensuring Exactly Once Semantics behaviour.
+     *
+     * @param topicName valid topic name
+     * @param eventKey  message key
+     * @param event     message payload
+     */
+    public void sendCloudEventUsingEos(final String topicName, final String eventKey, final CloudEvent event) {
+        final CompletableFuture<SendResult<String, CloudEvent>> eventFuture =
+                cloudEventKafkaTemplateForEos.send(topicName, eventKey, event);
+        eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, true));
+    }
+
+
     private void handleLegacyEventCallback(final String topicName,
             final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture) {
-        eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e));
+        eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false));
     }
 
     private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) {
@@ -120,12 +141,16 @@ public class EventsProducer {
         return eventHeaders;
     }
 
-    private static void logOutcome(final String topicName, final SendResult<String, ?> result, final Throwable e) {
+    private static void logOutcome(final String topicName, final SendResult<String, ?> result, final Throwable e,
+            final boolean throwKafkaException) {
         if (e == null) {
             final Object event = result.getProducerRecord().value();
             log.debug("Successfully sent event to topic : {} , Event : {}", topicName, event);
         } else {
             log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage());
+            if (throwKafkaException && e instanceof KafkaException) {
+                throw (KafkaException) e;
+            }
         }
     }
 
index bf54378..8c71fea 100644 (file)
@@ -41,8 +41,9 @@ import java.util.concurrent.CompletableFuture
 
 class EventsProducerSpec extends Specification {
 
-    def legacyKafkaTemplateMock = Mock(KafkaTemplate)
+    def mockLegacyKafkaTemplate = Mock(KafkaTemplate)
     def mockCloudEventKafkaTemplate = Mock(KafkaTemplate)
+    def mockCloudEventKafkaTemplateForEos = Mock(KafkaTemplate)
     def logger = Spy(ListAppender<ILoggingEvent>)
 
     void setup() {
@@ -56,7 +57,7 @@ class EventsProducerSpec extends Specification {
         ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders()
     }
 
-    def objectUnderTest = new EventsProducer(legacyKafkaTemplateMock, mockCloudEventKafkaTemplate)
+    def objectUnderTest = new EventsProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos)
 
     def 'Send Cloud Event'() {
         given: 'a successfully sent event'
@@ -71,9 +72,23 @@ class EventsProducerSpec extends Specification {
         when: 'sending the cloud event'
             objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent)
         then: 'the correct debug message is logged'
-            def lastLoggingEvent = logger.list[0]
-            assert lastLoggingEvent.level == Level.DEBUG
-            assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
+            assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event')
+    }
+
+    def 'Send Cloud Event Using EOS'() {
+        given: 'a successfull result from send event with EOS semantics'
+            def eventFuture = CompletableFuture.completedFuture(
+                new SendResult(
+                    new ProducerRecord('eos-topic', 'some-value'),
+                    new RecordMetadata(new TopicPartition('eos-topic', 0), 0, 0, 0, 0, 0)
+                )
+            )
+            def someCloudEvent = Mock(CloudEvent)
+            1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFuture
+        when: 'sending the cloud event using EOS'
+            objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent)
+        then: 'the correct debug message is logged'
+            assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
     }
 
     def 'Send Cloud Event with Exception'() {
@@ -85,9 +100,20 @@ class EventsProducerSpec extends Specification {
         when: 'sending the cloud event'
             objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent)
         then: 'the correct error message is logged'
-            def lastLoggingEvent = logger.list[0]
-            assert lastLoggingEvent.level == Level.ERROR
-            assert lastLoggingEvent.formattedMessage.contains('Unable to send event')
+            assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
+    }
+
+    def 'Send Cloud Event Using EOS with KafkaException'() {
+        given: 'an event fails with KafkaException'
+            def kafkaException = new org.springframework.kafka.KafkaException('some kafka exception')
+            def eventFutureWithFailure = new CompletableFuture<SendResult<String, CloudEvent>>()
+            eventFutureWithFailure.completeExceptionally(kafkaException)
+            def someCloudEvent = Mock(CloudEvent)
+            1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure
+        when: 'sending the cloud event using EOS'
+            objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent)
+        then: 'the correct error message is logged'
+            assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
     }
 
     def 'Send Legacy Event'() {
@@ -99,13 +125,11 @@ class EventsProducerSpec extends Specification {
                 )
             )
             def someLegacyEvent = Mock(LegacyEvent)
-            1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someLegacyEvent) >> eventFuture
+            1 * mockLegacyKafkaTemplate.send('some-topic', 'some-event-key', someLegacyEvent) >> eventFuture
         when: 'sending the cloud event'
             objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', someLegacyEvent)
         then: 'the correct debug message is logged'
-            def lastLoggingEvent = logger.list[0]
-            assert lastLoggingEvent.level == Level.DEBUG
-            assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
+            assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
     }
 
     def 'Send Legacy Event with Headers as Map'() {
@@ -121,11 +145,9 @@ class EventsProducerSpec extends Specification {
         when: 'sending the legacy event'
             objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent)
         then: 'event is sent'
-            1 * legacyKafkaTemplateMock.send(_) >> eventFuture
+            1 * mockLegacyKafkaTemplate.send(_) >> eventFuture
         and: 'the correct debug message is logged'
-            def lastLoggingEvent = logger.list[0]
-            assert lastLoggingEvent.level == Level.DEBUG
-            assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
+            assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
     }
 
     def 'Send Legacy Event with Record Headers'() {
@@ -142,11 +164,9 @@ class EventsProducerSpec extends Specification {
         when: 'sending the legacy event'
             objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent)
         then: 'event is sent'
-            1 * legacyKafkaTemplateMock.send(_) >> eventFuture
+            1 * mockLegacyKafkaTemplate.send(_) >> eventFuture
         and: 'the correct debug message is logged'
-            def lastLoggingEvent = logger.list[0]
-            assert lastLoggingEvent.level == Level.DEBUG
-            assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
+            assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
     }
 
     def 'Handle Legacy Event Callback'() {
@@ -160,9 +180,7 @@ class EventsProducerSpec extends Specification {
         when: 'handling legacy event callback'
             objectUnderTest.handleLegacyEventCallback('some-topic', eventFuture)
         then: 'the correct debug message is logged'
-            def lastLoggingEvent = logger.list[0]
-            assert lastLoggingEvent.level == Level.DEBUG
-            assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
+            assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
     }
 
     def 'Handle Legacy Event Callback with Exception'() {
@@ -172,9 +190,21 @@ class EventsProducerSpec extends Specification {
         when: 'handling legacy event callback'
             objectUnderTest.handleLegacyEventCallback('some-topic', eventFutureWithFailure)
         then: 'the correct error message is logged'
-            def lastLoggingEvent = logger.list[0]
-            assert lastLoggingEvent.level == Level.ERROR
-            assert lastLoggingEvent.formattedMessage.contains('Unable to send event')
+            assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
+    }
+
+    def 'Logging of non-kafka exceptions'() {
+        given: 'a runtime exception that is not KafkaException'
+            def sendResult = Mock(SendResult) {
+                getProducerRecord() >> Mock(ProducerRecord)
+            }
+            def runtimeException = new RuntimeException('some runtime exception')
+            def logOutcomeMethod = EventsProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean)
+            logOutcomeMethod.accessible = true
+        when: 'logging the outcome with throwKafkaException set to true'
+            logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException, true)
+        then: 'error message is logged'
+            assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
     }
 
     def 'Convert to kafka headers'() {
@@ -189,4 +219,9 @@ class EventsProducerSpec extends Specification {
             assert headers[1].key() == 'key2'
     }
 
+    def verifyLoggingEvent(expectedLevel, expectedFormattedMessage) {
+        def lastLoggingEvent = logger.list[0]
+        lastLoggingEvent.level == expectedLevel && lastLoggingEvent.formattedMessage.contains(expectedFormattedMessage)
+    }
+
 }
\ No newline at end of file
index 8b55d07..a57539e 100644 (file)
@@ -164,6 +164,8 @@ services:
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,CONNECTIONS_FROM_HOST://localhost:9092
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
     healthcheck:
       test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1
       interval: 10s
index b379d6e..8084673 100644 (file)
@@ -23,8 +23,6 @@ package org.onap.cps.integration.base
 
 import com.hazelcast.map.IMap
 import okhttp3.mockwebserver.MockWebServer
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.cps.api.CpsAnchorService
 import org.onap.cps.api.CpsDataService
 import org.onap.cps.api.CpsDataspaceService
@@ -81,8 +79,6 @@ import java.util.concurrent.BlockingQueue
 @EntityScan('org.onap.cps.ri.models')
 abstract class CpsIntegrationSpecBase extends Specification {
 
-    static KafkaConsumer kafkaConsumer
-
     @Shared
     DatabaseTestContainer databaseTestContainer = DatabaseTestContainer.getInstance()
 
@@ -344,13 +340,7 @@ abstract class CpsIntegrationSpecBase extends Specification {
         networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
     }
 
-    def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
-        kafkaConsumer = KafkaTestContainer.getConsumer(consumerGroupId, StringDeserializer.class)
-        kafkaConsumer.subscribe([topicName])
-        kafkaConsumer.poll(Duration.ofMillis(500))
-    }
-
-    def getLatestConsumerRecordsWithMaxPollOf1Second(numberOfRecordsToRead) {
+    def getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, numberOfRecordsToRead) {
         def consumerRecords = []
         def retryAttempts = 10
         while (consumerRecords.size() < numberOfRecordsToRead) {
index eb1484b..89e38c1 100644 (file)
@@ -21,8 +21,6 @@
 package org.onap.cps.integration.functional.ncmp.data
 
 import io.cloudevents.CloudEvent
-import io.cloudevents.kafka.CloudEventDeserializer
-import java.time.Duration
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.cps.integration.KafkaTestContainer
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
@@ -32,6 +30,8 @@ import org.onap.cps.ncmp.events.async1_0_0.Response
 import org.springframework.http.MediaType
 import spock.util.concurrent.PollingConditions
 
+import java.time.Duration
+
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status
 
@@ -40,7 +40,7 @@ class LegacyBatchDataOperationSpec extends CpsIntegrationSpecBase {
     KafkaConsumer kafkaConsumer
 
     def setup() {
-        kafkaConsumer = KafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class)
+        kafkaConsumer = KafkaTestContainer.getCloudEventConsumer('test-group')
         kafkaConsumer.subscribe(['legacy-batch-topic'])
         kafkaConsumer.poll(Duration.ofMillis(500))
         dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
@@ -90,7 +90,7 @@ class LegacyBatchDataOperationSpec extends CpsIntegrationSpecBase {
                     .andExpect(status().is2xxSuccessful())
 
         then: 'there is one kafka message'
-            def consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000))
+            def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1)
             assert consumerRecords.size() == 1
 
         and: 'it is a cloud event'
index 5a13cb2..f19a13f 100644 (file)
@@ -62,7 +62,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
     def setup() {
         registerCmHandlesForSubscriptions()
         kafkaTestContainer.start()
-        dmiInConsumer = kafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class)
+        dmiInConsumer = kafkaTestContainer.getCloudEventConsumer('test-group')
         dmiInConsumer.subscribe([dmiInTopic])
         dmiInConsumer.poll(Duration.ofMillis(500))
         testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class)
@@ -223,7 +223,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase {
     }
 
     def getAllConsumedCorrelationIds() {
-        def consumedEvents = dmiInConsumer.poll(Duration.ofMillis(1000))
+        def consumedEvents = getLatestConsumerRecordsWithMaxPollOf1Second(dmiInConsumer, 1)
         def headersMap = getAllHeaders(consumedEvents)
         return headersMap.get('ce_correlationid')
     }
index 2e1c803..ca8ebb0 100644 (file)
@@ -20,7 +20,9 @@
 
 package org.onap.cps.integration.functional.ncmp.inventory
 
-
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.cps.events.LegacyEvent
+import org.onap.cps.integration.KafkaTestContainer
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
 import org.onap.cps.ncmp.api.NcmpResponseStatus
 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
@@ -31,11 +33,15 @@ import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
 import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
 
+import java.time.Duration
+
 class CmHandleCreateSpec extends CpsIntegrationSpecBase {
 
     NetworkCmProxyInventoryFacadeImpl objectUnderTest
     def uniqueId = 'ch-unique-id-for-create-test'
 
+    KafkaConsumer<String, LegacyEvent> kafkaConsumer
+
     def setup() {
         objectUnderTest = networkCmProxyInventoryFacade
         subscribeAndClearPreviousMessages('test-group', 'ncmp-events')
@@ -72,7 +78,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
             assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
 
         then: 'get the latest messages'
-            def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(2)
+            def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2)
 
         and: 'both converted messages are for the correct cm handle'
             def notificationMessages = []
@@ -90,7 +96,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
             assert notificationMessages[1].event.dataProducerIdentifier == 'my-data-producer-identifier'
 
         and: 'there are no more messages to be read'
-            assert getLatestConsumerRecordsWithMaxPollOf1Second(1).size() == 0
+            assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1).size() == 0
 
         cleanup: 'deregister CM handle'
             deregisterCmHandle(DMI1_URL, uniqueId)
@@ -215,4 +221,10 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
             deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2'])
     }
 
+    def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
+        kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId)
+        kafkaConsumer.subscribe([topicName])
+        kafkaConsumer.poll(Duration.ofMillis(500))
+    }
+
 }
index 18c7096..1bb7ee2 100644 (file)
@@ -20,6 +20,9 @@
 
 package org.onap.cps.integration.functional.ncmp.inventory
 
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.cps.events.LegacyEvent
+import org.onap.cps.integration.KafkaTestContainer
 import org.onap.cps.integration.base.CpsIntegrationSpecBase
 import org.onap.cps.ncmp.api.NcmpResponseStatus
 import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
@@ -28,10 +31,15 @@ import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
 import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
 import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
 
+import java.time.Duration
+
 class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
 
     NetworkCmProxyInventoryFacadeImpl objectUnderTest
 
+    KafkaConsumer<String, LegacyEvent> kafkaConsumer
+
+
     def setup() {
         objectUnderTest = networkCmProxyInventoryFacade
         subscribeAndClearPreviousMessages('test-group-for-update', 'ncmp-events')
@@ -111,7 +119,7 @@ class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
             moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
 
         and: 'flush the latest cm handle registration events( state transition from NONE to ADVISED and ADVISED to READY)'
-            getLatestConsumerRecordsWithMaxPollOf1Second(2)
+            getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2)
 
         and: 'cm handle updated with the data producer identifier'
             def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, dataProducerIdentifier: 'my-data-producer-id')
@@ -122,7 +130,7 @@ class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
             assert dmiPluginRegistrationResponseForUpdate.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
 
         and: 'get the latest message'
-            def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(1)
+            def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1)
 
         and: 'the message has the updated data producer identifier'
             def notificationMessages = []
@@ -136,4 +144,9 @@ class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
             deregisterCmHandle(DMI1_URL, cmHandleId)
     }
 
+    def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
+        kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId)
+        kafkaConsumer.subscribe([topicName])
+        kafkaConsumer.poll(Duration.ofMillis(500))
+    }
 }
index 6494ab9..0773590 100644 (file)
@@ -19,6 +19,8 @@
 
 package org.onap.cps.integration;
 
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.CloudEventDeserializer;
 import java.util.HashMap;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
@@ -28,6 +30,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.onap.cps.events.LegacyEvent;
 import org.testcontainers.kafka.ConfluentKafkaContainer;
 
 /**
@@ -68,8 +71,12 @@ public class KafkaTestContainer extends ConfluentKafkaContainer {
         return kafkaTestContainer;
     }
 
-    public static KafkaConsumer getConsumer(final String consumerGroupId, final Object valueDeserializer) {
-        return new KafkaConsumer<>(consumerProperties(consumerGroupId, valueDeserializer));
+    public static KafkaConsumer<String, LegacyEvent> getLegacyEventConsumer(final String consumerGroupId) {
+        return new KafkaConsumer<>(consumerProperties(consumerGroupId, StringDeserializer.class));
+    }
+
+    public static KafkaConsumer<String, CloudEvent> getCloudEventConsumer(final String consumerGroupId) {
+        return new KafkaConsumer<>(consumerProperties(consumerGroupId, CloudEventDeserializer.class));
     }
 
     /**
index efe595e..a59c0b8 100644 (file)
@@ -184,6 +184,10 @@ ncmp:
   model-loader:
     maximum-attempt-count: 20
 
+  notifications:
+    avc-event-producer:
+      transaction-id-prefix: integration-test-tx-
+
   servlet:
     multipart:
       enabled: true