--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2026 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.config;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
+import static org.springframework.kafka.listener.ContainerProperties.AckMode.BATCH;
+
+import io.cloudevents.CloudEvent;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
+import java.time.Duration;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.RequiredArgsConstructor;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.ssl.SslBundles;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.KafkaException;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
+import org.springframework.kafka.transaction.KafkaTransactionManager;
+
+/**
+ * Kafka Configuration for Exactly Once Semantics using cloud events.
+ *
+ * <p>Note: When concurrency > 1, message ordering within a partition is NOT guaranteed.
+ * Use concurrency = 1 if strict ordering is required.
+ */
+@Configuration
+@EnableKafka
+@RequiredArgsConstructor
+@ConditionalOnProperty(name = "ncmp.kafka.eos.enabled")
+public class EosKafkaConfig {
+
+ private final KafkaProperties kafkaProperties;
+
+ @Value("${cps.tracing.enabled:false}")
+ private boolean tracingEnabled;
+
+ @Value("${ncmp.notifications.avc-event-producer.transaction-id-prefix:tx-}")
+ private String transactionIdPrefix;
+
+ @Value("${ncmp.notifications.avc-event-consumer.concurrency:1}")
+ private int concurrency;
+
+ @Value("${ncmp.notifications.avc-event-consumer.max-poll-records:500}")
+ private String maxPollRecords;
+
+ private static final UUID CPS_NCMP_INSTANCE_UUID = UUID.randomUUID();
+ private static final SslBundles NO_SSL = null;
+
+ /**
+ * Producer factory configured for exactly-once semantics.
+ *
+ * @return producer factory instance
+ */
+ @Bean
+ public ProducerFactory<String, CloudEvent> cloudEventProducerFactoryForEos() {
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
+ producerConfigProperties.put(ENABLE_IDEMPOTENCE_CONFIG, true);
+ producerConfigProperties.put(ACKS_CONFIG, "all");
+ if (tracingEnabled) {
+ producerConfigProperties.put(INTERCEPTOR_CLASSES_CONFIG,
+ TracingProducerInterceptor.class.getName());
+ }
+ final DefaultKafkaProducerFactory<String, CloudEvent> defaultKafkaProducerFactory =
+ new DefaultKafkaProducerFactory<>(producerConfigProperties);
+ defaultKafkaProducerFactory.setTransactionIdPrefix("cps-" + transactionIdPrefix + CPS_NCMP_INSTANCE_UUID + "-");
+ defaultKafkaProducerFactory.setProducerPerThread(true);
+ return defaultKafkaProducerFactory;
+ }
+
+ /**
+ * Consumer factory with read_committed isolation level for exactly once semantics.
+ *
+ * @return consumer factory instance
+ */
+ @Bean
+ public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactoryForEos() {
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
+ consumerConfigProperties.put(ISOLATION_LEVEL_CONFIG, "read_committed");
+ consumerConfigProperties.put(ENABLE_AUTO_COMMIT_CONFIG, false);
+ consumerConfigProperties.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerConfigProperties.put(MAX_POLL_RECORDS_CONFIG, maxPollRecords);
+ if (tracingEnabled) {
+ consumerConfigProperties.put(INTERCEPTOR_CLASSES_CONFIG,
+ TracingConsumerInterceptor.class.getName());
+ }
+ return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+ }
+
+ /**
+ * Kafka template for exactly-once semantics.
+ *
+ * @return kafka template instance
+ */
+ @Bean(name = "cloudEventKafkaTemplateForEos")
+ public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos(
+ @Qualifier("cloudEventProducerFactoryForEos")
+ final ProducerFactory<String, CloudEvent> producerFactory,
+ @Qualifier("cloudEventConsumerFactoryForEos")
+ final ConsumerFactory<String, CloudEvent> consumerFactory) {
+ final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(producerFactory);
+ kafkaTemplate.setConsumerFactory(consumerFactory);
+ if (tracingEnabled) {
+ kafkaTemplate.setObservationEnabled(true);
+ }
+ return kafkaTemplate;
+ }
+
+ /**
+ * Listener container factory with BATCH acknowledgment mode and infinite retries.
+ *
+ * @return listener container factory instance
+ */
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
+ cloudEventConcurrentKafkaListenerContainerFactoryForEos(
+ @Qualifier("cloudEventConsumerFactoryForEos")
+ final ConsumerFactory<String, CloudEvent> consumerFactory,
+ @Qualifier("kafkaEosTransactionManager")
+ final KafkaTransactionManager<String, CloudEvent> transactionManager) {
+ final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ containerFactory.setConsumerFactory(consumerFactory);
+ containerFactory.setConcurrency(concurrency);
+ containerFactory.setBatchListener(true);
+ containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
+ containerFactory.getContainerProperties().setAckMode(BATCH);
+ containerFactory.getContainerProperties().setKafkaAwareTransactionManager(transactionManager);
+ containerFactory.setCommonErrorHandler(kafkaErrorHandlerForEos());
+ if (tracingEnabled) {
+ containerFactory.getContainerProperties().setObservationEnabled(true);
+ }
+ return containerFactory;
+ }
+
+ @Bean(name = "kafkaEosTransactionManager")
+ public KafkaTransactionManager<String, CloudEvent> kafkaTransactionManagerForEos(
+ @Qualifier("cloudEventProducerFactoryForEos")
+ final ProducerFactory<String, CloudEvent> producerFactory) {
+ return new KafkaTransactionManager<>(producerFactory);
+ }
+
+ private DefaultErrorHandler kafkaErrorHandlerForEos() {
+ final ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries =
+ new ExponentialBackOffWithMaxRetries(Integer.MAX_VALUE);
+ exponentialBackOffWithMaxRetries.setInitialInterval(TimeUnit.SECONDS.toMillis(1));
+ exponentialBackOffWithMaxRetries.setMultiplier(2.0);
+ exponentialBackOffWithMaxRetries.setMaxInterval(TimeUnit.SECONDS.toMillis(30));
+ final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(exponentialBackOffWithMaxRetries);
+ defaultErrorHandler.addRetryableExceptions(KafkaException.class);
+ return defaultErrorHandler;
+ }
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023-2026 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.config
+
+import io.cloudevents.CloudEvent
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.spockframework.spring.EnableSharedInjection
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.ProducerFactory
+import org.springframework.kafka.listener.ContainerProperties
+import org.springframework.kafka.transaction.KafkaTransactionManager
+import org.springframework.test.context.TestPropertySource
+import spock.lang.Shared
+import spock.lang.Specification
+
+@SpringBootTest(classes = [KafkaProperties, EosKafkaConfig])
+@EnableSharedInjection
+@EnableConfigurationProperties
+@TestPropertySource(properties = [
+ "ncmp.kafka.eos.enabled=true",
+ "ncmp.notifications.avc-event-producer.transaction-id-prefix=tx-myPrefix-",
+ "ncmp.notifications.avc-event-consumer.concurrency=2",
+ "ncmp.notifications.avc-event-consumer.max-poll-records=500"
+])
+class EosKafkaConfigSpec extends Specification {
+
+ @Shared
+ @Autowired
+ ConsumerFactory<String, CloudEvent> cloudEventConsumerFactoryForEos
+
+ @Shared
+ @Autowired
+ ProducerFactory<String, CloudEvent> cloudEventProducerFactoryForEos
+
+ @Shared
+ @Autowired
+ ConcurrentKafkaListenerContainerFactory<String, CloudEvent> cloudEventConcurrentKafkaListenerContainerFactoryForEos
+
+ def 'Verify EOS kafka configuration'() {
+ expect: 'consumer has read_committed isolation level'
+ cloudEventConsumerFactoryForEos.configurationProperties[ConsumerConfig.ISOLATION_LEVEL_CONFIG] == 'read_committed'
+ and: 'consumer has auto commit disabled'
+ cloudEventConsumerFactoryForEos.configurationProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] == false
+ and: 'consumer has max poll records configured'
+ cloudEventConsumerFactoryForEos.configurationProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] == '500'
+ and: 'listener uses BATCH ack mode'
+ cloudEventConcurrentKafkaListenerContainerFactoryForEos.containerProperties.ackMode == ContainerProperties.AckMode.BATCH
+ and: 'listener is batch listener'
+ cloudEventConcurrentKafkaListenerContainerFactoryForEos.batchListener == true
+ and: 'concurrency is configured'
+ cloudEventConcurrentKafkaListenerContainerFactoryForEos.concurrency == 2
+ and: 'producer transaction ID prefix is as expected'
+ cloudEventProducerFactoryForEos.transactionIdPrefix.startsWith('cps-tx-myPrefix-')
+ and: 'KafkaTransactionManager is used instead of primary transaction manager'
+ cloudEventConcurrentKafkaListenerContainerFactoryForEos.containerProperties.kafkaAwareTransactionManager instanceof KafkaTransactionManager
+ }
+}