Revert "Implement Exactly Once Semantics for CmAvcEvents flow" 64/142864/2
authoremaclee <lee.anjella.macabuhay@est.tech>
Tue, 6 Jan 2026 09:53:54 +0000 (09:53 +0000)
committerLee Anjella Macabuhay <lee.anjella.macabuhay@est.tech>
Wed, 7 Jan 2026 13:51:14 +0000 (13:51 +0000)
This reverts commit bab08c745e60f339107bb8265d2e198702f85742.

Issue-ID: CPS-3103
Change-Id: Ifd17870ead776dab9a813c6f628efdeb57feba57
Signed-off-by: emaclee <lee.anjella.macabuhay@est.tech>
17 files changed:
cps-application/src/main/resources/application.yml
cps-application/src/test/resources/application.yml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java [deleted file]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventProducerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/CloudEventMapperSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/MessagingBaseSpec.groovy
cps-service/src/main/java/org/onap/cps/events/EventProducer.java
cps-service/src/test/groovy/org/onap/cps/events/EventProducerSpec.groovy
docker-compose/docker-compose.yml
integration-test/src/test/resources/application.yml

index 743da6d..ba15152 100644 (file)
@@ -243,10 +243,6 @@ ncmp:
     model-loader:
         maximum-attempt-count: 20
 
-    notifications:
-        avc-event-producer:
-            transaction-id-prefix: tx-
-
 # Custom Hazelcast Config.
 hazelcast:
     cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"}
index 107facb..0bb43b0 100644 (file)
@@ -237,10 +237,6 @@ ncmp:
     model-loader:
         maximum-attempt-count: 20
 
-    notifications:
-        avc-event-producer:
-            transaction-id-prefix: test-tx-
-
 # Custom Hazelcast Config.
 hazelcast:
     cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java
deleted file mode 100644 (file)
index 33dfada..0000000
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.config;
-
-import io.cloudevents.CloudEvent;
-import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
-import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
-import java.time.Duration;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.boot.ssl.SslBundles;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.KafkaException;
-import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.listener.ContainerProperties;
-import org.springframework.kafka.listener.DefaultErrorHandler;
-import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
-import org.springframework.kafka.transaction.KafkaTransactionManager;
-
-/**
- * kafka Configuration for implementing Exactly Once Semantics using cloud events.
- */
-@Configuration
-@EnableKafka
-@RequiredArgsConstructor
-public class ExactlyOnceSemanticsKafkaConfig {
-
-    private final KafkaProperties kafkaProperties;
-
-    @Value("${cps.tracing.enabled:false}")
-    private boolean tracingEnabled;
-
-    @Value("${ncmp.notifications.avc-event-producer.transaction-id-prefix:tx-}")
-    private String transactionIdPrefixForExactlyOnceSemantics;
-
-    private static final SslBundles NO_SSL = null;
-
-
-    /**
-     * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
-     * application.yml with CloudEventSerializer.This factory is configured to support
-     * exactly-once semantics by enabling idempotence and setting a transaction ID prefix.
-     *
-     * @return cloud event producer instance.
-     */
-    @Bean
-    public ProducerFactory<String, CloudEvent> cloudEventProducerFactoryForEos() {
-        final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
-        producerConfigProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
-        producerConfigProperties.put(ProducerConfig.ACKS_CONFIG, "all");
-        if (tracingEnabled) {
-            producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
-                    TracingProducerInterceptor.class.getName());
-        }
-        final DefaultKafkaProducerFactory<String, CloudEvent> defaultKafkaProducerFactory =
-                new DefaultKafkaProducerFactory<>(producerConfigProperties);
-        defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefixForExactlyOnceSemantics);
-        return defaultKafkaProducerFactory;
-    }
-
-    /**
-     * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
-     * into application.yml having CloudEventDeserializer as deserializer-value.This factory is configured with
-     * read_committed isolation level to support exactly-once semantics.
-     *
-     * @return an instance of cloud consumer factory.
-     */
-    @Bean
-    public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactoryForEos() {
-        final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
-        consumerConfigProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
-        if (tracingEnabled) {
-            consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
-                    TracingConsumerInterceptor.class.getName());
-        }
-        return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
-    }
-
-
-    /**
-     * A cloud Kafka event template for executing high-level operations. The template is configured using the cloud
-     * event producer and consumer factories to support
-     * exactly-once semantics.
-     *
-     * @return an instance of cloud Kafka template.
-     */
-    @Bean(name = "cloudEventKafkaTemplateForEos")
-    public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos() {
-        final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactoryForEos());
-        kafkaTemplate.setConsumerFactory(cloudEventConsumerFactoryForEos());
-        if (tracingEnabled) {
-            kafkaTemplate.setObservationEnabled(true);
-        }
-        return kafkaTemplate;
-    }
-
-    /**
-     * A Concurrent CloudEvent kafka listener container factory.
-     * This factory supports exactly-once semantics, retry handling, and optional tracing.
-     *
-     * @return instance of Concurrent kafka listener factory
-     */
-    @Bean
-    public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
-                            cloudEventConcurrentKafkaListenerContainerFactoryForEos() {
-        final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
-                new ConcurrentKafkaListenerContainerFactory<>();
-        containerFactory.setConsumerFactory(cloudEventConsumerFactoryForEos());
-        containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
-        containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
-        containerFactory.getContainerProperties().setKafkaAwareTransactionManager(kafkaTransactionManagerForEos());
-        containerFactory.setCommonErrorHandler(kafkaErrorHandlerWithMaxRetriesForEos());
-        if (tracingEnabled) {
-            containerFactory.getContainerProperties().setObservationEnabled(true);
-        }
-
-        return containerFactory;
-    }
-
-    private KafkaTransactionManager<String, CloudEvent> kafkaTransactionManagerForEos() {
-        return new KafkaTransactionManager<>(cloudEventProducerFactoryForEos());
-    }
-
-    private DefaultErrorHandler kafkaErrorHandlerWithMaxRetriesForEos() {
-
-        final ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries =
-                new ExponentialBackOffWithMaxRetries(Integer.MAX_VALUE);
-        exponentialBackOffWithMaxRetries.setInitialInterval(1000L); // 1 sec
-        exponentialBackOffWithMaxRetries.setMultiplier(2.0);
-        exponentialBackOffWithMaxRetries.setMaxInterval(30_000L); // 30 sec
-        final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(exponentialBackOffWithMaxRetries);
-        defaultErrorHandler.addRetryableExceptions(KafkaException.class);
-
-        return defaultErrorHandler;
-    }
-}
index 6ff9dcb..be70833 100644 (file)
@@ -99,7 +99,8 @@ public class KafkaConfig {
      *
      * @return an instance of legacy Kafka template.
      */
-    @Bean(name = "legacyEventKafkaTemplate")
+    @Bean
+    @Primary
     public KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate() {
         final KafkaTemplate<String, LegacyEvent> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
         kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
@@ -165,8 +166,7 @@ public class KafkaConfig {
      *
      * @return an instance of cloud Kafka template.
      */
-    @Primary
-    @Bean(name = "cloudEventKafkaTemplate")
+    @Bean
     public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
         final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
         kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
index cb5dc2e..7966586 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (c) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
+ * Copyright (c) 2023-2026 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -36,7 +36,6 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
 
 /**
  * Listener for AVC events based on CM Subscriptions.
@@ -64,9 +63,8 @@ public class CmAvcEventConsumer {
      *
      * @param cmAvcEventAsConsumerRecord Incoming raw consumer record
      */
-    @Transactional
     @KafkaListener(topics = "${app.dmi.cm-events.topic}",
-        containerFactory = "cloudEventConcurrentKafkaListenerContainerFactoryForEos")
+        containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
     @Timed(value = "cps.ncmp.cm.notifications.consume.and.forward", description = "Time taken to forward CM AVC events")
     public void consumeAndForward(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
         if (isEventFromOnapDmiPlugin(cmAvcEventAsConsumerRecord.headers())) {
@@ -74,9 +72,8 @@ public class CmAvcEventConsumer {
         }
         final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
         final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
-
         log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
-        eventProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
+        eventProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
     }
 
     private void processCmAvcEventChanges(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
@@ -93,6 +90,6 @@ public class CmAvcEventConsumer {
 
     private boolean isEventFromOnapDmiPlugin(final Headers headers) {
         final String sourceSystem = KafkaHeaders.getParsedKafkaHeader(headers, CLOUD_EVENT_SOURCE_SYSTEM_HEADER_KEY);
-        return "ONAP-DMI-PLUGIN".equals(sourceSystem);
+        return sourceSystem != null && sourceSystem.equals("ONAP-DMI-PLUGIN");
     }
 }
index 6ffe000..e4db82e 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
+ *  Copyright (C) 2023-2026 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -26,7 +26,6 @@ import io.cloudevents.kafka.CloudEventSerializer
 import org.onap.cps.events.LegacyEvent
 import org.spockframework.spring.EnableSharedInjection
 import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.beans.factory.annotation.Qualifier
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties
 import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.boot.test.context.SpringBootTest
@@ -37,7 +36,7 @@ import org.springframework.test.context.TestPropertySource
 import spock.lang.Shared
 import spock.lang.Specification
 
-@SpringBootTest(classes = [KafkaProperties, KafkaConfig, ExactlyOnceSemanticsKafkaConfig])
+@SpringBootTest(classes = [KafkaProperties, KafkaConfig])
 @EnableSharedInjection
 @EnableConfigurationProperties
 @TestPropertySource(properties = ["cps.tracing.enabled=true"])
@@ -45,20 +44,12 @@ class KafkaConfigSpec extends Specification {
 
     @Shared
     @Autowired
-    @Qualifier("legacyEventKafkaTemplate")
     KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate
 
     @Shared
     @Autowired
-    @Qualifier("cloudEventKafkaTemplate")
     KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate
 
-    @Shared
-    @Autowired
-    @Qualifier("cloudEventKafkaTemplateForEos")
-    KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos
-
-
     def 'Verify kafka template serializer and deserializer configuration for #eventType.'() {
         expect: 'kafka template is instantiated'
             assert kafkaTemplateInstance.properties['beanName'] == beanName
@@ -67,9 +58,8 @@ class KafkaConfigSpec extends Specification {
         and: 'verify event key and value deserializer'
             assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName())
         where: 'the following event type is used'
-            eventType                   | kafkaTemplateInstance         || beanName                        | valueSerializer      | delegateDeserializer
-            'legacy event'              | legacyEventKafkaTemplate      || 'legacyEventKafkaTemplate'      | JsonSerializer       | JsonDeserializer
-            'cloud event'               | cloudEventKafkaTemplate       || 'cloudEventKafkaTemplate'       | CloudEventSerializer | CloudEventDeserializer
-            'transactional cloud event' | cloudEventKafkaTemplateForEos || 'cloudEventKafkaTemplateForEos' | CloudEventSerializer | CloudEventDeserializer
+            eventType      | kafkaTemplateInstance    || beanName                   | valueSerializer      | delegateDeserializer
+            'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer       | JsonDeserializer
+            'cloud event'  | cloudEventKafkaTemplate  || 'cloudEventKafkaTemplate'  | CloudEventSerializer | CloudEventDeserializer
     }
 }
index efd0055..b0b200a 100644 (file)
@@ -44,7 +44,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase
 
     @SpringBean
     EventProducer cpsAsyncRequestResponseEventProducer =
-        new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+        new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
 
 
     @SpringBean
index 9c32ce9..166ebdf 100644 (file)
@@ -51,7 +51,7 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 class DataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventProducer asyncDataOperationEventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+    EventProducer asyncDataOperationEventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
     DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer)
index 0d6fda3..943377a 100644 (file)
@@ -55,7 +55,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
     JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
 
     @SpringBean
-    EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+    EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     def 'Process per data operation request with #serviceName.'() {
         given: 'data operation request with 3 operations'
index e8b860f..5912916 100644 (file)
@@ -51,13 +51,13 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 class CmAvcEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+    EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     def mockCmAvcEventService = Mock(CmAvcEventService)
     def mockInventoryPersistence = Mock(InventoryPersistence)
 
     @SpringBean
-    CmAvcEventConsumer objectUnderTest = new CmAvcEventConsumer(eventsProducer, mockCmAvcEventService, mockInventoryPersistence)
+    CmAvcEventConsumer objectUnderTest = new CmAvcEventConsumer(eventProducer, mockCmAvcEventService, mockInventoryPersistence)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
@@ -148,4 +148,4 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec {
             .withSource(URI.create(sourceSystem as String))
             .withExtension('correlationid', cmHandleId).build()
     }
-}
+}
\ No newline at end of file
index ea57d75..3845f69 100644 (file)
@@ -45,10 +45,11 @@ import java.time.Duration
 class EventProducerSpec extends MessagingBaseSpec {
 
     def legacyEventKafkaConsumer = new KafkaConsumer<String, LegacyEvent>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
+
     def testTopic = 'ncmp-events-test'
 
     @SpringBean
-    EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+    EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/CloudEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/CloudEventMapperSpec.groovy
new file mode 100644 (file)
index 0000000..273f978
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2026 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.utils.events
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.slf4j.LoggerFactory
+import spock.lang.Specification
+
+class CloudEventMapperSpec extends Specification {
+
+    def logger = Spy(ListAppender<ILoggingEvent>)
+
+    void setup() {
+        def setupLogger = ((Logger) LoggerFactory.getLogger(CloudEventMapper.class))
+        setupLogger.setLevel(Level.DEBUG)
+        setupLogger.addAppender(logger)
+        logger.start()
+    }
+
+    void cleanup() {
+        ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders()
+    }
+
+    def 'Map cloud event with runtime exception'() {
+        given: 'a cloud event with invalid data'
+            def cloudEvent = CloudEventBuilder.v1()
+                .withId('test-id')
+                .withType('test-type')
+                .withSource(URI.create('test-source'))
+                .withData('invalid-json-data'.bytes)
+                .build()
+        when: 'mapping to target event class'
+            CloudEventMapper.toTargetEvent(cloudEvent, String.class)
+        then: 'error message is logged'
+            def loggingEvents = logger.list
+            assert loggingEvents.size() >= 1
+            def errorEvent = loggingEvents.find { it.level == Level.ERROR }
+            assert errorEvent.formattedMessage.contains('Unable to map cloud event to target event class type')
+            assert errorEvent.formattedMessage.contains('class java.lang.String')
+    }
+}
\ No newline at end of file
index 58e3394..39b549e 100644 (file)
@@ -48,9 +48,6 @@ class MessagingBaseSpec extends Specification {
     def legacyEventKafkaTemplate = new KafkaTemplate<String, LegacyEvent>(new DefaultKafkaProducerFactory<String, LegacyEvent>(eventProducerConfigProperties(JsonSerializer)))
 
     def cloudEventKafkaTemplate = new KafkaTemplate<String, CloudEvent>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
-
-    def cloudEventKafkaTemplateForEos = new KafkaTemplate<String, CloudEvent>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
-
     @DynamicPropertySource
     static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
         dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
index b0fc551..8369d75 100644 (file)
@@ -29,7 +29,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.kafka.KafkaException;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Service;
@@ -55,9 +54,6 @@ public class EventProducer {
     @Qualifier("cloudEventKafkaTemplate")
     private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
 
-    @Qualifier("cloudEventKafkaTemplateForEos")
-    private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos;
-
     /**
      * Generic CloudEvent sender.
      *
@@ -68,7 +64,7 @@ public class EventProducer {
     public void sendCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
         final CompletableFuture<SendResult<String, CloudEvent>> eventFuture =
                 cloudEventKafkaTemplate.send(topicName, eventKey, event);
-        eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false));
+        eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e));
     }
 
     /**
@@ -86,7 +82,6 @@ public class EventProducer {
         handleLegacyEventCallback(topicName, eventFuture);
     }
 
-
     /**
      * Legacy Event sender with headers in a Map. Schemas that implement LegacyEvent are eligible to use this method.
      *
@@ -107,22 +102,10 @@ public class EventProducer {
         handleLegacyEventCallback(topicName, eventFuture);
     }
 
-    /**
-     * Generic CloudEvent sender ensuring Exactly Once Semantics behaviour.
-     *
-     * @param topicName valid topic name
-     * @param eventKey  message key
-     * @param event     message payload
-     */
-    public void sendCloudEventUsingEos(final String topicName, final String eventKey, final CloudEvent event) {
-        final CompletableFuture<SendResult<String, CloudEvent>> eventFuture =
-                cloudEventKafkaTemplateForEos.send(topicName, eventKey, event);
-        eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, true));
-    }
 
     private void handleLegacyEventCallback(final String topicName,
             final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture) {
-        eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false));
+        eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e));
     }
 
     private Headers convertToKafkaHeaders(final Map<String, Object> headersAsMap) {
@@ -131,16 +114,12 @@ public class EventProducer {
         return headers;
     }
 
-    private static void logOutcome(final String topicName, final SendResult<String, ?> result, final Throwable e,
-            final boolean throwKafkaException) {
+    private static void logOutcome(final String topicName, final SendResult<String, ?> result, final Throwable e) {
         if (e == null) {
             final Object event = result.getProducerRecord().value();
             log.debug("Successfully sent event to topic : {} , Event : {}", topicName, event);
         } else {
             log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage());
-            if (throwKafkaException && e instanceof KafkaException) {
-                throw (KafkaException) e;
-            }
         }
     }
 
index 528a859..ef8cafb 100644 (file)
@@ -32,7 +32,6 @@ import org.apache.kafka.common.header.Headers
 import org.slf4j.LoggerFactory
 import org.springframework.kafka.core.KafkaTemplate
 import org.springframework.kafka.support.SendResult
-import org.springframework.util.SerializationUtils
 import spock.lang.Specification
 
 import java.util.concurrent.CompletableFuture
@@ -41,7 +40,6 @@ class EventProducerSpec extends Specification {
 
     def mockLegacyKafkaTemplate = Mock(KafkaTemplate)
     def mockCloudEventKafkaTemplate = Mock(KafkaTemplate)
-    def mockCloudEventKafkaTemplateForEos = Mock(KafkaTemplate)
     def logger = Spy(ListAppender<ILoggingEvent>)
 
     void setup() {
@@ -55,7 +53,7 @@ class EventProducerSpec extends Specification {
         ((Logger) LoggerFactory.getLogger(EventProducer.class)).detachAndStopAllAppenders()
     }
 
-    def objectUnderTest = new EventProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos)
+    def objectUnderTest = new EventProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate)
 
     def 'Send Cloud Event'() {
         given: 'a successfully sent event'
@@ -73,22 +71,6 @@ class EventProducerSpec extends Specification {
             assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event')
     }
 
-    def 'Send Cloud Event Using EOS'() {
-        given: 'a successfull result from send event with EOS semantics'
-            def eventFuture = CompletableFuture.completedFuture(
-                new SendResult(
-                    new ProducerRecord('eos-topic', 'some-value'),
-                    new RecordMetadata(new TopicPartition('eos-topic', 0), 0, 0, 0, 0, 0)
-                )
-            )
-            def someCloudEvent = Mock(CloudEvent)
-            1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFuture
-        when: 'sending the cloud event using EOS'
-            objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent)
-        then: 'the correct debug message is logged'
-            assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
-    }
-
     def 'Send Cloud Event with Exception'() {
         given: 'a failed event'
             def eventFutureWithFailure = new CompletableFuture<SendResult<String, String>>()
@@ -101,19 +83,6 @@ class EventProducerSpec extends Specification {
             assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
     }
 
-    def 'Send Cloud Event Using EOS with KafkaException'() {
-        given: 'an event fails with KafkaException'
-            def kafkaException = new org.springframework.kafka.KafkaException('some kafka exception')
-            def eventFutureWithFailure = new CompletableFuture<SendResult<String, CloudEvent>>()
-            eventFutureWithFailure.completeExceptionally(kafkaException)
-            def someCloudEvent = Mock(CloudEvent)
-            1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure
-        when: 'sending the cloud event using EOS'
-            objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent)
-        then: 'the correct error message is logged'
-            assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
-    }
-
     def 'Send Legacy Event'() {
         given: 'a successfully sent event'
             def eventFuture = CompletableFuture.completedFuture(
@@ -132,7 +101,7 @@ class EventProducerSpec extends Specification {
 
     def 'Send Legacy Event with Headers as Map'() {
         given: 'a successfully sent event'
-            def sampleEventHeaders = ['k1': SerializationUtils.serialize('v1')]
+            def sampleEventHeaders = ['k1': 'v1', 'k2': 'v2']
             def eventFuture = CompletableFuture.completedFuture(
                 new SendResult(
                     new ProducerRecord('some-topic', 'some-value'),
@@ -178,10 +147,10 @@ class EventProducerSpec extends Specification {
                 getProducerRecord() >> Mock(ProducerRecord)
             }
             def runtimeException = new RuntimeException('some runtime exception')
-            def logOutcomeMethod = EventProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean)
+            def logOutcomeMethod = EventProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable)
             logOutcomeMethod.accessible = true
         when: 'logging the outcome with throwKafkaException set to true'
-            logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException, true)
+            logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException)
         then: 'error message is logged'
             assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
     }
@@ -203,4 +172,4 @@ class EventProducerSpec extends Specification {
         lastLoggingEvent.level == expectedLevel && lastLoggingEvent.formattedMessage.contains(expectedFormattedMessage)
     }
 
-}
+}
\ No newline at end of file
index 2ff7dcf..5834689 100644 (file)
@@ -164,8 +164,6 @@ services:
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,CONNECTIONS_FROM_HOST://localhost:9092
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
-      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
-      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
     healthcheck:
       test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1
       interval: 10s
index a59c0b8..efe595e 100644 (file)
@@ -184,10 +184,6 @@ ncmp:
   model-loader:
     maximum-attempt-count: 20
 
-  notifications:
-    avc-event-producer:
-      transaction-id-prefix: integration-test-tx-
-
   servlet:
     multipart:
       enabled: true