From: emaclee Date: Tue, 6 Jan 2026 09:53:54 +0000 (+0000) Subject: Revert "Implement Exactly Once Semantics for CmAvcEvents flow" X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=9a0e6c61bbd9af1fccabc556f5ffeded1340ad30;p=cps.git Revert "Implement Exactly Once Semantics for CmAvcEvents flow" This reverts commit bab08c745e60f339107bb8265d2e198702f85742. Issue-ID: CPS-3103 Change-Id: Ifd17870ead776dab9a813c6f628efdeb57feba57 Signed-off-by: emaclee --- diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index 743da6db44..ba15152ddc 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -243,10 +243,6 @@ ncmp: model-loader: maximum-attempt-count: 20 - notifications: - avc-event-producer: - transaction-id-prefix: tx- - # Custom Hazelcast Config. hazelcast: cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"} diff --git a/cps-application/src/test/resources/application.yml b/cps-application/src/test/resources/application.yml index 107facb0e7..0bb43b0bb6 100644 --- a/cps-application/src/test/resources/application.yml +++ b/cps-application/src/test/resources/application.yml @@ -237,10 +237,6 @@ ncmp: model-loader: maximum-attempt-count: 20 - notifications: - avc-event-producer: - transaction-id-prefix: test-tx- - # Custom Hazelcast Config. hazelcast: cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java deleted file mode 100644 index 33dfada6a8..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.config; - -import io.cloudevents.CloudEvent; -import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; -import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; -import java.time.Duration; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.ssl.SslBundles; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.KafkaException; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.DefaultErrorHandler; -import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries; -import org.springframework.kafka.transaction.KafkaTransactionManager; - -/** - * kafka Configuration for implementing Exactly Once Semantics using cloud events. - */ -@Configuration -@EnableKafka -@RequiredArgsConstructor -public class ExactlyOnceSemanticsKafkaConfig { - - private final KafkaProperties kafkaProperties; - - @Value("${cps.tracing.enabled:false}") - private boolean tracingEnabled; - - @Value("${ncmp.notifications.avc-event-producer.transaction-id-prefix:tx-}") - private String transactionIdPrefixForExactlyOnceSemantics; - - private static final SslBundles NO_SSL = null; - - - /** - * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into - * application.yml with CloudEventSerializer.This factory is configured to support - * exactly-once semantics by enabling idempotence and setting a transaction ID prefix. - * - * @return cloud event producer instance. - */ - @Bean - public ProducerFactory cloudEventProducerFactoryForEos() { - final Map producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL); - producerConfigProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - producerConfigProperties.put(ProducerConfig.ACKS_CONFIG, "all"); - if (tracingEnabled) { - producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, - TracingProducerInterceptor.class.getName()); - } - final DefaultKafkaProducerFactory defaultKafkaProducerFactory = - new DefaultKafkaProducerFactory<>(producerConfigProperties); - defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefixForExactlyOnceSemantics); - return defaultKafkaProducerFactory; - } - - /** - * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined - * into application.yml having CloudEventDeserializer as deserializer-value.This factory is configured with - * read_committed isolation level to support exactly-once semantics. - * - * @return an instance of cloud consumer factory. - */ - @Bean - public ConsumerFactory cloudEventConsumerFactoryForEos() { - final Map consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL); - consumerConfigProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); - if (tracingEnabled) { - consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, - TracingConsumerInterceptor.class.getName()); - } - return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); - } - - - /** - * A cloud Kafka event template for executing high-level operations. The template is configured using the cloud - * event producer and consumer factories to support - * exactly-once semantics. - * - * @return an instance of cloud Kafka template. - */ - @Bean(name = "cloudEventKafkaTemplateForEos") - public KafkaTemplate cloudEventKafkaTemplateForEos() { - final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactoryForEos()); - kafkaTemplate.setConsumerFactory(cloudEventConsumerFactoryForEos()); - if (tracingEnabled) { - kafkaTemplate.setObservationEnabled(true); - } - return kafkaTemplate; - } - - /** - * A Concurrent CloudEvent kafka listener container factory. - * This factory supports exactly-once semantics, retry handling, and optional tracing. - * - * @return instance of Concurrent kafka listener factory - */ - @Bean - public ConcurrentKafkaListenerContainerFactory - cloudEventConcurrentKafkaListenerContainerFactoryForEos() { - final ConcurrentKafkaListenerContainerFactory containerFactory = - new ConcurrentKafkaListenerContainerFactory<>(); - containerFactory.setConsumerFactory(cloudEventConsumerFactoryForEos()); - containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10)); - containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD); - containerFactory.getContainerProperties().setKafkaAwareTransactionManager(kafkaTransactionManagerForEos()); - containerFactory.setCommonErrorHandler(kafkaErrorHandlerWithMaxRetriesForEos()); - if (tracingEnabled) { - containerFactory.getContainerProperties().setObservationEnabled(true); - } - - return containerFactory; - } - - private KafkaTransactionManager kafkaTransactionManagerForEos() { - return new KafkaTransactionManager<>(cloudEventProducerFactoryForEos()); - } - - private DefaultErrorHandler kafkaErrorHandlerWithMaxRetriesForEos() { - - final ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries = - new ExponentialBackOffWithMaxRetries(Integer.MAX_VALUE); - exponentialBackOffWithMaxRetries.setInitialInterval(1000L); // 1 sec - exponentialBackOffWithMaxRetries.setMultiplier(2.0); - exponentialBackOffWithMaxRetries.setMaxInterval(30_000L); // 30 sec - final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(exponentialBackOffWithMaxRetries); - defaultErrorHandler.addRetryableExceptions(KafkaException.class); - - return defaultErrorHandler; - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java index 6ff9dcbcae..be70833271 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java @@ -99,7 +99,8 @@ public class KafkaConfig { * * @return an instance of legacy Kafka template. */ - @Bean(name = "legacyEventKafkaTemplate") + @Bean + @Primary public KafkaTemplate legacyEventKafkaTemplate() { final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory()); @@ -165,8 +166,7 @@ public class KafkaConfig { * * @return an instance of cloud Kafka template. */ - @Primary - @Bean(name = "cloudEventKafkaTemplate") + @Bean public KafkaTemplate cloudEventKafkaTemplate() { final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory()); kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory()); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java index cb5dc2e2f8..796658601e 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (c) 2023-2025 OpenInfra Foundation Europe. All rights reserved. + * Copyright (c) 2023-2026 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,7 +36,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; /** * Listener for AVC events based on CM Subscriptions. @@ -64,9 +63,8 @@ public class CmAvcEventConsumer { * * @param cmAvcEventAsConsumerRecord Incoming raw consumer record */ - @Transactional @KafkaListener(topics = "${app.dmi.cm-events.topic}", - containerFactory = "cloudEventConcurrentKafkaListenerContainerFactoryForEos") + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") @Timed(value = "cps.ncmp.cm.notifications.consume.and.forward", description = "Time taken to forward CM AVC events") public void consumeAndForward(final ConsumerRecord cmAvcEventAsConsumerRecord) { if (isEventFromOnapDmiPlugin(cmAvcEventAsConsumerRecord.headers())) { @@ -74,9 +72,8 @@ public class CmAvcEventConsumer { } final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value(); final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key(); - log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent); - eventProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); + eventProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); } private void processCmAvcEventChanges(final ConsumerRecord cmAvcEventAsConsumerRecord) { @@ -93,6 +90,6 @@ public class CmAvcEventConsumer { private boolean isEventFromOnapDmiPlugin(final Headers headers) { final String sourceSystem = KafkaHeaders.getParsedKafkaHeader(headers, CLOUD_EVENT_SOURCE_SYSTEM_HEADER_KEY); - return "ONAP-DMI-PLUGIN".equals(sourceSystem); + return sourceSystem != null && sourceSystem.equals("ONAP-DMI-PLUGIN"); } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy index 6ffe000859..e4db82e587 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved. + * Copyright (C) 2023-2026 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,6 @@ import io.cloudevents.kafka.CloudEventSerializer import org.onap.cps.events.LegacyEvent import org.spockframework.spring.EnableSharedInjection import org.springframework.beans.factory.annotation.Autowired -import org.springframework.beans.factory.annotation.Qualifier import org.springframework.boot.autoconfigure.kafka.KafkaProperties import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.boot.test.context.SpringBootTest @@ -37,7 +36,7 @@ import org.springframework.test.context.TestPropertySource import spock.lang.Shared import spock.lang.Specification -@SpringBootTest(classes = [KafkaProperties, KafkaConfig, ExactlyOnceSemanticsKafkaConfig]) +@SpringBootTest(classes = [KafkaProperties, KafkaConfig]) @EnableSharedInjection @EnableConfigurationProperties @TestPropertySource(properties = ["cps.tracing.enabled=true"]) @@ -45,20 +44,12 @@ class KafkaConfigSpec extends Specification { @Shared @Autowired - @Qualifier("legacyEventKafkaTemplate") KafkaTemplate legacyEventKafkaTemplate @Shared @Autowired - @Qualifier("cloudEventKafkaTemplate") KafkaTemplate cloudEventKafkaTemplate - @Shared - @Autowired - @Qualifier("cloudEventKafkaTemplateForEos") - KafkaTemplate cloudEventKafkaTemplateForEos - - def 'Verify kafka template serializer and deserializer configuration for #eventType.'() { expect: 'kafka template is instantiated' assert kafkaTemplateInstance.properties['beanName'] == beanName @@ -67,9 +58,8 @@ class KafkaConfigSpec extends Specification { and: 'verify event key and value deserializer' assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName()) where: 'the following event type is used' - eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer - 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer - 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer - 'transactional cloud event' | cloudEventKafkaTemplateForEos || 'cloudEventKafkaTemplateForEos' | CloudEventSerializer | CloudEventDeserializer + eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer + 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer + 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy index efd0055b69..b0b200a01b 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy @@ -44,7 +44,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase @SpringBean EventProducer cpsAsyncRequestResponseEventProducer = - new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) + new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate); @SpringBean diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy index 9c32ce9fe1..166ebdfacd 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy @@ -51,7 +51,7 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent class DataOperationEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventProducer asyncDataOperationEventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) + EventProducer asyncDataOperationEventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy index 0d6fda3b92..943377a970 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy @@ -55,7 +55,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec { JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) @SpringBean - EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) + EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) def 'Process per data operation request with #serviceName.'() { given: 'data operation request with 3 operations' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy index e8b860fbcd..5912916ee2 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy @@ -51,13 +51,13 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent class CmAvcEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) + EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) def mockCmAvcEventService = Mock(CmAvcEventService) def mockInventoryPersistence = Mock(InventoryPersistence) @SpringBean - CmAvcEventConsumer objectUnderTest = new CmAvcEventConsumer(eventsProducer, mockCmAvcEventService, mockInventoryPersistence) + CmAvcEventConsumer objectUnderTest = new CmAvcEventConsumer(eventProducer, mockCmAvcEventService, mockInventoryPersistence) @Autowired JsonObjectMapper jsonObjectMapper @@ -148,4 +148,4 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec { .withSource(URI.create(sourceSystem as String)) .withExtension('correlationid', cmHandleId).build() } -} +} \ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventProducerSpec.groovy index ea57d75b1e..3845f69302 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventProducerSpec.groovy @@ -45,10 +45,11 @@ import java.time.Duration class EventProducerSpec extends MessagingBaseSpec { def legacyEventKafkaConsumer = new KafkaConsumer(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) + def testTopic = 'ncmp-events-test' @SpringBean - EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) + EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @Autowired JsonObjectMapper jsonObjectMapper diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/CloudEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/CloudEventMapperSpec.groovy new file mode 100644 index 0000000000..273f978722 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/CloudEventMapperSpec.groovy @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2026 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.utils.events + +import ch.qos.logback.classic.Level +import ch.qos.logback.classic.Logger +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.read.ListAppender +import io.cloudevents.CloudEvent +import io.cloudevents.core.builder.CloudEventBuilder +import org.slf4j.LoggerFactory +import spock.lang.Specification + +class CloudEventMapperSpec extends Specification { + + def logger = Spy(ListAppender) + + void setup() { + def setupLogger = ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)) + setupLogger.setLevel(Level.DEBUG) + setupLogger.addAppender(logger) + logger.start() + } + + void cleanup() { + ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders() + } + + def 'Map cloud event with runtime exception'() { + given: 'a cloud event with invalid data' + def cloudEvent = CloudEventBuilder.v1() + .withId('test-id') + .withType('test-type') + .withSource(URI.create('test-source')) + .withData('invalid-json-data'.bytes) + .build() + when: 'mapping to target event class' + CloudEventMapper.toTargetEvent(cloudEvent, String.class) + then: 'error message is logged' + def loggingEvents = logger.list + assert loggingEvents.size() >= 1 + def errorEvent = loggingEvents.find { it.level == Level.ERROR } + assert errorEvent.formattedMessage.contains('Unable to map cloud event to target event class type') + assert errorEvent.formattedMessage.contains('class java.lang.String') + } +} \ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/MessagingBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/MessagingBaseSpec.groovy index 58e33945ae..39b549e4c5 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/MessagingBaseSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/MessagingBaseSpec.groovy @@ -48,9 +48,6 @@ class MessagingBaseSpec extends Specification { def legacyEventKafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory(eventProducerConfigProperties(JsonSerializer))) def cloudEventKafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory(eventProducerConfigProperties(CloudEventSerializer))) - - def cloudEventKafkaTemplateForEos = new KafkaTemplate(new DefaultKafkaProducerFactory(eventProducerConfigProperties(CloudEventSerializer))) - @DynamicPropertySource static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) diff --git a/cps-service/src/main/java/org/onap/cps/events/EventProducer.java b/cps-service/src/main/java/org/onap/cps/events/EventProducer.java index b0fc551412..8369d756e4 100644 --- a/cps-service/src/main/java/org/onap/cps/events/EventProducer.java +++ b/cps-service/src/main/java/org/onap/cps/events/EventProducer.java @@ -29,7 +29,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.kafka.KafkaException; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -55,9 +54,6 @@ public class EventProducer { @Qualifier("cloudEventKafkaTemplate") private final KafkaTemplate cloudEventKafkaTemplate; - @Qualifier("cloudEventKafkaTemplateForEos") - private final KafkaTemplate cloudEventKafkaTemplateForEos; - /** * Generic CloudEvent sender. * @@ -68,7 +64,7 @@ public class EventProducer { public void sendCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { final CompletableFuture> eventFuture = cloudEventKafkaTemplate.send(topicName, eventKey, event); - eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false)); + eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e)); } /** @@ -86,7 +82,6 @@ public class EventProducer { handleLegacyEventCallback(topicName, eventFuture); } - /** * Legacy Event sender with headers in a Map. Schemas that implement LegacyEvent are eligible to use this method. * @@ -107,22 +102,10 @@ public class EventProducer { handleLegacyEventCallback(topicName, eventFuture); } - /** - * Generic CloudEvent sender ensuring Exactly Once Semantics behaviour. - * - * @param topicName valid topic name - * @param eventKey message key - * @param event message payload - */ - public void sendCloudEventUsingEos(final String topicName, final String eventKey, final CloudEvent event) { - final CompletableFuture> eventFuture = - cloudEventKafkaTemplateForEos.send(topicName, eventKey, event); - eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, true)); - } private void handleLegacyEventCallback(final String topicName, final CompletableFuture> eventFuture) { - eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false)); + eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e)); } private Headers convertToKafkaHeaders(final Map headersAsMap) { @@ -131,16 +114,12 @@ public class EventProducer { return headers; } - private static void logOutcome(final String topicName, final SendResult result, final Throwable e, - final boolean throwKafkaException) { + private static void logOutcome(final String topicName, final SendResult result, final Throwable e) { if (e == null) { final Object event = result.getProducerRecord().value(); log.debug("Successfully sent event to topic : {} , Event : {}", topicName, event); } else { log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage()); - if (throwKafkaException && e instanceof KafkaException) { - throw (KafkaException) e; - } } } diff --git a/cps-service/src/test/groovy/org/onap/cps/events/EventProducerSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/events/EventProducerSpec.groovy index 528a859e07..ef8cafb1ab 100644 --- a/cps-service/src/test/groovy/org/onap/cps/events/EventProducerSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/events/EventProducerSpec.groovy @@ -32,7 +32,6 @@ import org.apache.kafka.common.header.Headers import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.SendResult -import org.springframework.util.SerializationUtils import spock.lang.Specification import java.util.concurrent.CompletableFuture @@ -41,7 +40,6 @@ class EventProducerSpec extends Specification { def mockLegacyKafkaTemplate = Mock(KafkaTemplate) def mockCloudEventKafkaTemplate = Mock(KafkaTemplate) - def mockCloudEventKafkaTemplateForEos = Mock(KafkaTemplate) def logger = Spy(ListAppender) void setup() { @@ -55,7 +53,7 @@ class EventProducerSpec extends Specification { ((Logger) LoggerFactory.getLogger(EventProducer.class)).detachAndStopAllAppenders() } - def objectUnderTest = new EventProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos) + def objectUnderTest = new EventProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate) def 'Send Cloud Event'() { given: 'a successfully sent event' @@ -73,22 +71,6 @@ class EventProducerSpec extends Specification { assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') } - def 'Send Cloud Event Using EOS'() { - given: 'a successfull result from send event with EOS semantics' - def eventFuture = CompletableFuture.completedFuture( - new SendResult( - new ProducerRecord('eos-topic', 'some-value'), - new RecordMetadata(new TopicPartition('eos-topic', 0), 0, 0, 0, 0, 0) - ) - ) - def someCloudEvent = Mock(CloudEvent) - 1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFuture - when: 'sending the cloud event using EOS' - objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent) - then: 'the correct debug message is logged' - assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true - } - def 'Send Cloud Event with Exception'() { given: 'a failed event' def eventFutureWithFailure = new CompletableFuture>() @@ -101,19 +83,6 @@ class EventProducerSpec extends Specification { assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true } - def 'Send Cloud Event Using EOS with KafkaException'() { - given: 'an event fails with KafkaException' - def kafkaException = new org.springframework.kafka.KafkaException('some kafka exception') - def eventFutureWithFailure = new CompletableFuture>() - eventFutureWithFailure.completeExceptionally(kafkaException) - def someCloudEvent = Mock(CloudEvent) - 1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure - when: 'sending the cloud event using EOS' - objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent) - then: 'the correct error message is logged' - assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true - } - def 'Send Legacy Event'() { given: 'a successfully sent event' def eventFuture = CompletableFuture.completedFuture( @@ -132,7 +101,7 @@ class EventProducerSpec extends Specification { def 'Send Legacy Event with Headers as Map'() { given: 'a successfully sent event' - def sampleEventHeaders = ['k1': SerializationUtils.serialize('v1')] + def sampleEventHeaders = ['k1': 'v1', 'k2': 'v2'] def eventFuture = CompletableFuture.completedFuture( new SendResult( new ProducerRecord('some-topic', 'some-value'), @@ -178,10 +147,10 @@ class EventProducerSpec extends Specification { getProducerRecord() >> Mock(ProducerRecord) } def runtimeException = new RuntimeException('some runtime exception') - def logOutcomeMethod = EventProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean) + def logOutcomeMethod = EventProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable) logOutcomeMethod.accessible = true when: 'logging the outcome with throwKafkaException set to true' - logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException, true) + logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException) then: 'error message is logged' assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true } @@ -203,4 +172,4 @@ class EventProducerSpec extends Specification { lastLoggingEvent.level == expectedLevel && lastLoggingEvent.formattedMessage.contains(expectedFormattedMessage) } -} +} \ No newline at end of file diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index 2ff7dcf754..5834689b06 100644 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -164,8 +164,6 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,CONNECTIONS_FROM_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 healthcheck: test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1 interval: 10s diff --git a/integration-test/src/test/resources/application.yml b/integration-test/src/test/resources/application.yml index a59c0b8c6f..efe595ed7f 100644 --- a/integration-test/src/test/resources/application.yml +++ b/integration-test/src/test/resources/application.yml @@ -184,10 +184,6 @@ ncmp: model-loader: maximum-attempt-count: 20 - notifications: - avc-event-producer: - transaction-id-prefix: integration-test-tx- - servlet: multipart: enabled: true