From: emaclee Date: Mon, 23 Feb 2026 09:25:46 +0000 (+0000) Subject: Add toggle-able Exactly Once Semantics config X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=bcfa51534a8c7c96fdda9127a1e132daf6a15d03;p=cps.git Add toggle-able Exactly Once Semantics config Issue-ID: CPS-3159 Change-Id: I50a497867a560b58be0ee2fe9f795a451cf839f9 Signed-off-by: emaclee --- diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/EosKafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/EosKafkaConfig.java new file mode 100644 index 0000000000..1c961c83ec --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/EosKafkaConfig.java @@ -0,0 +1,191 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2026 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.config; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; +import static org.springframework.kafka.listener.ContainerProperties.AckMode.BATCH; + +import io.cloudevents.CloudEvent; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; +import java.time.Duration; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.ssl.SslBundles; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.KafkaException; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries; +import org.springframework.kafka.transaction.KafkaTransactionManager; + +/** + * Kafka Configuration for Exactly Once Semantics using cloud events. + * + *

Note: When concurrency > 1, message ordering within a partition is NOT guaranteed. + * Use concurrency = 1 if strict ordering is required. + */ +@Configuration +@EnableKafka +@RequiredArgsConstructor +@ConditionalOnProperty(name = "ncmp.kafka.eos.enabled") +public class EosKafkaConfig { + + private final KafkaProperties kafkaProperties; + + @Value("${cps.tracing.enabled:false}") + private boolean tracingEnabled; + + @Value("${ncmp.notifications.avc-event-producer.transaction-id-prefix:tx-}") + private String transactionIdPrefix; + + @Value("${ncmp.notifications.avc-event-consumer.concurrency:1}") + private int concurrency; + + @Value("${ncmp.notifications.avc-event-consumer.max-poll-records:500}") + private String maxPollRecords; + + private static final UUID CPS_NCMP_INSTANCE_UUID = UUID.randomUUID(); + private static final SslBundles NO_SSL = null; + + /** + * Producer factory configured for exactly-once semantics. + * + * @return producer factory instance + */ + @Bean + public ProducerFactory cloudEventProducerFactoryForEos() { + final Map producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL); + producerConfigProperties.put(ENABLE_IDEMPOTENCE_CONFIG, true); + producerConfigProperties.put(ACKS_CONFIG, "all"); + if (tracingEnabled) { + producerConfigProperties.put(INTERCEPTOR_CLASSES_CONFIG, + TracingProducerInterceptor.class.getName()); + } + final DefaultKafkaProducerFactory defaultKafkaProducerFactory = + new DefaultKafkaProducerFactory<>(producerConfigProperties); + defaultKafkaProducerFactory.setTransactionIdPrefix("cps-" + transactionIdPrefix + CPS_NCMP_INSTANCE_UUID + "-"); + defaultKafkaProducerFactory.setProducerPerThread(true); + return defaultKafkaProducerFactory; + } + + /** + * Consumer factory with read_committed isolation level for exactly once semantics. + * + * @return consumer factory instance + */ + @Bean + public ConsumerFactory cloudEventConsumerFactoryForEos() { + final Map consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL); + consumerConfigProperties.put(ISOLATION_LEVEL_CONFIG, "read_committed"); + consumerConfigProperties.put(ENABLE_AUTO_COMMIT_CONFIG, false); + consumerConfigProperties.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfigProperties.put(MAX_POLL_RECORDS_CONFIG, maxPollRecords); + if (tracingEnabled) { + consumerConfigProperties.put(INTERCEPTOR_CLASSES_CONFIG, + TracingConsumerInterceptor.class.getName()); + } + return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); + } + + /** + * Kafka template for exactly-once semantics. + * + * @return kafka template instance + */ + @Bean(name = "cloudEventKafkaTemplateForEos") + public KafkaTemplate cloudEventKafkaTemplateForEos( + @Qualifier("cloudEventProducerFactoryForEos") + final ProducerFactory producerFactory, + @Qualifier("cloudEventConsumerFactoryForEos") + final ConsumerFactory consumerFactory) { + final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory); + kafkaTemplate.setConsumerFactory(consumerFactory); + if (tracingEnabled) { + kafkaTemplate.setObservationEnabled(true); + } + return kafkaTemplate; + } + + /** + * Listener container factory with BATCH acknowledgment mode and infinite retries. + * + * @return listener container factory instance + */ + @Bean + public ConcurrentKafkaListenerContainerFactory + cloudEventConcurrentKafkaListenerContainerFactoryForEos( + @Qualifier("cloudEventConsumerFactoryForEos") + final ConsumerFactory consumerFactory, + @Qualifier("kafkaEosTransactionManager") + final KafkaTransactionManager transactionManager) { + final ConcurrentKafkaListenerContainerFactory containerFactory = + new ConcurrentKafkaListenerContainerFactory<>(); + containerFactory.setConsumerFactory(consumerFactory); + containerFactory.setConcurrency(concurrency); + containerFactory.setBatchListener(true); + containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10)); + containerFactory.getContainerProperties().setAckMode(BATCH); + containerFactory.getContainerProperties().setKafkaAwareTransactionManager(transactionManager); + containerFactory.setCommonErrorHandler(kafkaErrorHandlerForEos()); + if (tracingEnabled) { + containerFactory.getContainerProperties().setObservationEnabled(true); + } + return containerFactory; + } + + @Bean(name = "kafkaEosTransactionManager") + public KafkaTransactionManager kafkaTransactionManagerForEos( + @Qualifier("cloudEventProducerFactoryForEos") + final ProducerFactory producerFactory) { + return new KafkaTransactionManager<>(producerFactory); + } + + private DefaultErrorHandler kafkaErrorHandlerForEos() { + final ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries = + new ExponentialBackOffWithMaxRetries(Integer.MAX_VALUE); + exponentialBackOffWithMaxRetries.setInitialInterval(TimeUnit.SECONDS.toMillis(1)); + exponentialBackOffWithMaxRetries.setMultiplier(2.0); + exponentialBackOffWithMaxRetries.setMaxInterval(TimeUnit.SECONDS.toMillis(30)); + final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(exponentialBackOffWithMaxRetries); + defaultErrorHandler.addRetryableExceptions(KafkaException.class); + return defaultErrorHandler; + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/EosKafkaConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/EosKafkaConfigSpec.groovy new file mode 100644 index 0000000000..9ef21834e8 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/EosKafkaConfigSpec.groovy @@ -0,0 +1,80 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023-2026 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.config + +import io.cloudevents.CloudEvent +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.spockframework.spring.EnableSharedInjection +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.kafka.KafkaProperties +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.transaction.KafkaTransactionManager +import org.springframework.test.context.TestPropertySource +import spock.lang.Shared +import spock.lang.Specification + +@SpringBootTest(classes = [KafkaProperties, EosKafkaConfig]) +@EnableSharedInjection +@EnableConfigurationProperties +@TestPropertySource(properties = [ + "ncmp.kafka.eos.enabled=true", + "ncmp.notifications.avc-event-producer.transaction-id-prefix=tx-myPrefix-", + "ncmp.notifications.avc-event-consumer.concurrency=2", + "ncmp.notifications.avc-event-consumer.max-poll-records=500" +]) +class EosKafkaConfigSpec extends Specification { + + @Shared + @Autowired + ConsumerFactory cloudEventConsumerFactoryForEos + + @Shared + @Autowired + ProducerFactory cloudEventProducerFactoryForEos + + @Shared + @Autowired + ConcurrentKafkaListenerContainerFactory cloudEventConcurrentKafkaListenerContainerFactoryForEos + + def 'Verify EOS kafka configuration'() { + expect: 'consumer has read_committed isolation level' + cloudEventConsumerFactoryForEos.configurationProperties[ConsumerConfig.ISOLATION_LEVEL_CONFIG] == 'read_committed' + and: 'consumer has auto commit disabled' + cloudEventConsumerFactoryForEos.configurationProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] == false + and: 'consumer has max poll records configured' + cloudEventConsumerFactoryForEos.configurationProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] == '500' + and: 'listener uses BATCH ack mode' + cloudEventConcurrentKafkaListenerContainerFactoryForEos.containerProperties.ackMode == ContainerProperties.AckMode.BATCH + and: 'listener is batch listener' + cloudEventConcurrentKafkaListenerContainerFactoryForEos.batchListener == true + and: 'concurrency is configured' + cloudEventConcurrentKafkaListenerContainerFactoryForEos.concurrency == 2 + and: 'producer transaction ID prefix is as expected' + cloudEventProducerFactoryForEos.transactionIdPrefix.startsWith('cps-tx-myPrefix-') + and: 'KafkaTransactionManager is used instead of primary transaction manager' + cloudEventConcurrentKafkaListenerContainerFactoryForEos.containerProperties.kafkaAwareTransactionManager instanceof KafkaTransactionManager + } +}