From bab08c745e60f339107bb8265d2e198702f85742 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Wed, 24 Sep 2025 16:45:13 +0100 Subject: [PATCH] Implement Exactly Once Semantics for CmAvcEvents flow - introduced exactly once processing for the Cm Avc Event forwarding flow - corresponding unit test and integration test added Issue-ID: CPS-2984 Change-Id: I4eea7c15fe9c2ebbdb441bff669ced365d17510d Signed-off-by: mpriyank --- cps-application/src/main/resources/application.yml | 4 + cps-application/src/test/resources/application.yml | 4 + .../config/ExactlyOnceSemanticsKafkaConfig.java | 165 +++++++++++++++++++++ .../java/org/onap/cps/ncmp/config/KafkaConfig.java | 6 +- .../subscription/cmavc/CmAvcEventConsumer.java | 15 +- .../onap/cps/ncmp/config/KafkaConfigSpec.groovy | 18 ++- ...AsyncRequestResponseEventIntegrationSpec.groovy | 2 +- .../async/DataOperationEventConsumerSpec.groovy | 2 +- .../data/utils/DmiDataOperationsHelperSpec.groovy | 2 +- .../cmavc/CmAvcEventConsumerSpec.groovy | 2 +- .../inventory/sync/lcm/EventsProducerSpec.groovy | 5 +- .../utils/events/InventoryEventProducerSpec.groovy | 2 +- .../cps/ncmp/utils/events/MessagingBaseSpec.groovy | 7 +- .../java/org/onap/cps/events/EventsProducer.java | 37 ++++- .../org/onap/cps/events/EventsProducerSpec.groovy | 87 +++++++---- docker-compose/docker-compose.yml | 2 + .../integration/base/CpsIntegrationSpecBase.groovy | 12 +- .../ncmp/data/LegacyBatchDataOperationSpec.groovy | 8 +- .../subscription/CmSubscriptionSpec.groovy | 4 +- .../ncmp/inventory/CmHandleCreateSpec.groovy | 18 ++- .../ncmp/inventory/CmHandleUpdateSpec.groovy | 17 ++- .../onap/cps/integration/KafkaTestContainer.java | 11 +- .../src/test/resources/application.yml | 4 + 23 files changed, 359 insertions(+), 75 deletions(-) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index 90e5ac1a42..bfc5339338 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -240,6 +240,10 @@ ncmp: model-loader: maximum-attempt-count: 20 + notifications: + avc-event-producer: + transaction-id-prefix: tx- + # Custom Hazelcast Config. hazelcast: cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"} diff --git a/cps-application/src/test/resources/application.yml b/cps-application/src/test/resources/application.yml index 0bb43b0bb6..107facb0e7 100644 --- a/cps-application/src/test/resources/application.yml +++ b/cps-application/src/test/resources/application.yml @@ -237,6 +237,10 @@ ncmp: model-loader: maximum-attempt-count: 20 + notifications: + avc-event-producer: + transaction-id-prefix: test-tx- + # Custom Hazelcast Config. hazelcast: cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java new file mode 100644 index 0000000000..33dfada6a8 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ExactlyOnceSemanticsKafkaConfig.java @@ -0,0 +1,165 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.config; + +import io.cloudevents.CloudEvent; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; +import java.time.Duration; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.ssl.SslBundles; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.KafkaException; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries; +import org.springframework.kafka.transaction.KafkaTransactionManager; + +/** + * kafka Configuration for implementing Exactly Once Semantics using cloud events. + */ +@Configuration +@EnableKafka +@RequiredArgsConstructor +public class ExactlyOnceSemanticsKafkaConfig { + + private final KafkaProperties kafkaProperties; + + @Value("${cps.tracing.enabled:false}") + private boolean tracingEnabled; + + @Value("${ncmp.notifications.avc-event-producer.transaction-id-prefix:tx-}") + private String transactionIdPrefixForExactlyOnceSemantics; + + private static final SslBundles NO_SSL = null; + + + /** + * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into + * application.yml with CloudEventSerializer.This factory is configured to support + * exactly-once semantics by enabling idempotence and setting a transaction ID prefix. + * + * @return cloud event producer instance. + */ + @Bean + public ProducerFactory cloudEventProducerFactoryForEos() { + final Map producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL); + producerConfigProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + producerConfigProperties.put(ProducerConfig.ACKS_CONFIG, "all"); + if (tracingEnabled) { + producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingProducerInterceptor.class.getName()); + } + final DefaultKafkaProducerFactory defaultKafkaProducerFactory = + new DefaultKafkaProducerFactory<>(producerConfigProperties); + defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefixForExactlyOnceSemantics); + return defaultKafkaProducerFactory; + } + + /** + * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined + * into application.yml having CloudEventDeserializer as deserializer-value.This factory is configured with + * read_committed isolation level to support exactly-once semantics. + * + * @return an instance of cloud consumer factory. + */ + @Bean + public ConsumerFactory cloudEventConsumerFactoryForEos() { + final Map consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL); + consumerConfigProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + if (tracingEnabled) { + consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingConsumerInterceptor.class.getName()); + } + return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); + } + + + /** + * A cloud Kafka event template for executing high-level operations. The template is configured using the cloud + * event producer and consumer factories to support + * exactly-once semantics. + * + * @return an instance of cloud Kafka template. + */ + @Bean(name = "cloudEventKafkaTemplateForEos") + public KafkaTemplate cloudEventKafkaTemplateForEos() { + final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactoryForEos()); + kafkaTemplate.setConsumerFactory(cloudEventConsumerFactoryForEos()); + if (tracingEnabled) { + kafkaTemplate.setObservationEnabled(true); + } + return kafkaTemplate; + } + + /** + * A Concurrent CloudEvent kafka listener container factory. + * This factory supports exactly-once semantics, retry handling, and optional tracing. + * + * @return instance of Concurrent kafka listener factory + */ + @Bean + public ConcurrentKafkaListenerContainerFactory + cloudEventConcurrentKafkaListenerContainerFactoryForEos() { + final ConcurrentKafkaListenerContainerFactory containerFactory = + new ConcurrentKafkaListenerContainerFactory<>(); + containerFactory.setConsumerFactory(cloudEventConsumerFactoryForEos()); + containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10)); + containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD); + containerFactory.getContainerProperties().setKafkaAwareTransactionManager(kafkaTransactionManagerForEos()); + containerFactory.setCommonErrorHandler(kafkaErrorHandlerWithMaxRetriesForEos()); + if (tracingEnabled) { + containerFactory.getContainerProperties().setObservationEnabled(true); + } + + return containerFactory; + } + + private KafkaTransactionManager kafkaTransactionManagerForEos() { + return new KafkaTransactionManager<>(cloudEventProducerFactoryForEos()); + } + + private DefaultErrorHandler kafkaErrorHandlerWithMaxRetriesForEos() { + + final ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries = + new ExponentialBackOffWithMaxRetries(Integer.MAX_VALUE); + exponentialBackOffWithMaxRetries.setInitialInterval(1000L); // 1 sec + exponentialBackOffWithMaxRetries.setMultiplier(2.0); + exponentialBackOffWithMaxRetries.setMaxInterval(30_000L); // 30 sec + final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(exponentialBackOffWithMaxRetries); + defaultErrorHandler.addRetryableExceptions(KafkaException.class); + + return defaultErrorHandler; + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java index be70833271..6ff9dcbcae 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java @@ -99,8 +99,7 @@ public class KafkaConfig { * * @return an instance of legacy Kafka template. */ - @Bean - @Primary + @Bean(name = "legacyEventKafkaTemplate") public KafkaTemplate legacyEventKafkaTemplate() { final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory()); @@ -166,7 +165,8 @@ public class KafkaConfig { * * @return an instance of cloud Kafka template. */ - @Bean + @Primary + @Bean(name = "cloudEventKafkaTemplate") public KafkaTemplate cloudEventKafkaTemplate() { final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory()); kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory()); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java index 0eb657388a..95896061a9 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java @@ -34,8 +34,10 @@ import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent; import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; /** * Listener for AVC events based on CM Subscriptions. @@ -64,8 +66,9 @@ public class CmAvcEventConsumer { * * @param cmAvcEventAsConsumerRecord Incoming raw consumer record */ + @Transactional @KafkaListener(topics = "${app.dmi.cm-events.topic}", - containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactoryForEos") @Timed(value = "cps.ncmp.cm.notifications.consume.and.forward", description = "Time taken to forward CM AVC events") public void consumeAndForward(final ConsumerRecord cmAvcEventAsConsumerRecord) { if (isEventFromOnapDmiPlugin(cmAvcEventAsConsumerRecord.headers())) { @@ -73,8 +76,14 @@ public class CmAvcEventConsumer { } final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value(); final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key(); - log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent); - eventsProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); + + // Only for testing/demo + if (outgoingAvcEventKey.equals("retry")) { + throw new KafkaException("test kafka exception for testing"); + } + + log.info("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent); + eventsProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); } private void processCmAvcEventChanges(final ConsumerRecord cmAvcEventAsConsumerRecord) { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy index b2e42ced26..6ffe000859 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy @@ -26,6 +26,7 @@ import io.cloudevents.kafka.CloudEventSerializer import org.onap.cps.events.LegacyEvent import org.spockframework.spring.EnableSharedInjection import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.boot.autoconfigure.kafka.KafkaProperties import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.boot.test.context.SpringBootTest @@ -36,7 +37,7 @@ import org.springframework.test.context.TestPropertySource import spock.lang.Shared import spock.lang.Specification -@SpringBootTest(classes = [KafkaProperties, KafkaConfig]) +@SpringBootTest(classes = [KafkaProperties, KafkaConfig, ExactlyOnceSemanticsKafkaConfig]) @EnableSharedInjection @EnableConfigurationProperties @TestPropertySource(properties = ["cps.tracing.enabled=true"]) @@ -44,12 +45,20 @@ class KafkaConfigSpec extends Specification { @Shared @Autowired + @Qualifier("legacyEventKafkaTemplate") KafkaTemplate legacyEventKafkaTemplate @Shared @Autowired + @Qualifier("cloudEventKafkaTemplate") KafkaTemplate cloudEventKafkaTemplate + @Shared + @Autowired + @Qualifier("cloudEventKafkaTemplateForEos") + KafkaTemplate cloudEventKafkaTemplateForEos + + def 'Verify kafka template serializer and deserializer configuration for #eventType.'() { expect: 'kafka template is instantiated' assert kafkaTemplateInstance.properties['beanName'] == beanName @@ -58,8 +67,9 @@ class KafkaConfigSpec extends Specification { and: 'verify event key and value deserializer' assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName()) where: 'the following event type is used' - eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer - 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer - 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer + eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer + 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer + 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer + 'transactional cloud event' | cloudEventKafkaTemplateForEos || 'cloudEventKafkaTemplateForEos' | CloudEventSerializer | CloudEventDeserializer } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy index fec4fba89c..906779c494 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy @@ -44,7 +44,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase @SpringBean EventsProducer cpsAsyncRequestResponseEventProducer = - new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate); + new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) @SpringBean diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy index 7b7faf3d8a..420da6ffb5 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy @@ -51,7 +51,7 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent class DataOperationEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) @SpringBean DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy index a42bf1fbe1..b55959d5d8 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy @@ -56,7 +56,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec { JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) @SpringBean - EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) def 'Process per data operation request with #serviceName.'() { given: 'data operation request with 3 operations' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy index 5a0980cb02..82d979ed61 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy @@ -51,7 +51,7 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent class CmAvcEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) def mockCmAvcEventService = Mock(CmAvcEventService) def mockInventoryPersistence = Mock(InventoryPersistence) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy index ed984ec155..5d974fe157 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.events.EventsProducer +import org.onap.cps.events.LegacyEvent import org.onap.cps.ncmp.events.lcm.v1.Event import org.onap.cps.ncmp.events.lcm.v1.LcmEvent import org.onap.cps.ncmp.utils.TestUtils @@ -43,12 +44,12 @@ import java.time.Duration @DirtiesContext class EventsProducerSpec extends MessagingBaseSpec { - def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) + def legacyEventKafkaConsumer = new KafkaConsumer(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) def testTopic = 'ncmp-events-test' @SpringBean - EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) @Autowired JsonObjectMapper jsonObjectMapper diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy index 21fc6563c2..1ee936b76a 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy @@ -32,7 +32,7 @@ import org.springframework.test.context.ContextConfiguration @ContextConfiguration(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper]) class InventoryEventProducerSpec extends MessagingBaseSpec { - def mockEventsProducer = Mock(EventsProducer) + def mockEventsProducer = Mock(EventsProducer) def objectUnderTest = new InventoryEventProducer(mockEventsProducer) def 'Send an attribute value change event'() { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/MessagingBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/MessagingBaseSpec.groovy index ab6c3fddbf..58e33945ae 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/MessagingBaseSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/MessagingBaseSpec.groovy @@ -24,6 +24,7 @@ import io.cloudevents.CloudEvent import io.cloudevents.kafka.CloudEventSerializer import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer +import org.onap.cps.events.LegacyEvent import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.serializer.JsonSerializer @@ -44,9 +45,11 @@ class MessagingBaseSpec extends Specification { static kafkaTestContainer = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0") - def legacyEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(eventProducerConfigProperties(JsonSerializer))) + def legacyEventKafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory(eventProducerConfigProperties(JsonSerializer))) - def cloudEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(eventProducerConfigProperties(CloudEventSerializer))) + def cloudEventKafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory(eventProducerConfigProperties(CloudEventSerializer))) + + def cloudEventKafkaTemplateForEos = new KafkaTemplate(new DefaultKafkaProducerFactory(eventProducerConfigProperties(CloudEventSerializer))) @DynamicPropertySource static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { diff --git a/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java b/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java index 7d28dc63df..61758a08a4 100644 --- a/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java +++ b/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java @@ -28,6 +28,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -47,10 +49,15 @@ public class EventsProducer { * Note: Cloud events should be used. This will be addressed as part of .... */ - private final KafkaTemplate legacyKafkaEventTemplate; + @Qualifier("legacyEventKafkaTemplate") + private final KafkaTemplate legacyEventKafkaTemplate; + @Qualifier("cloudEventKafkaTemplate") private final KafkaTemplate cloudEventKafkaTemplate; + @Qualifier("cloudEventKafkaTemplateForEos") + private final KafkaTemplate cloudEventKafkaTemplateForEos; + /** * Generic CloudEvent sender. * @@ -61,7 +68,7 @@ public class EventsProducer { public void sendCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { final CompletableFuture> eventFuture = cloudEventKafkaTemplate.send(topicName, eventKey, event); - eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e)); + eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false)); } /** @@ -75,7 +82,7 @@ public class EventsProducer { */ public void sendLegacyEvent(final String topicName, final String eventKey, final LegacyEvent event) { final CompletableFuture> eventFuture = - legacyKafkaEventTemplate.send(topicName, eventKey, event); + legacyEventKafkaTemplate.send(topicName, eventKey, event); handleLegacyEventCallback(topicName, eventFuture); } @@ -92,7 +99,7 @@ public class EventsProducer { final ProducerRecord producerRecord = new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); final CompletableFuture> eventFuture = - legacyKafkaEventTemplate.send(producerRecord); + legacyEventKafkaTemplate.send(producerRecord); handleLegacyEventCallback(topicName, eventFuture); } @@ -109,9 +116,23 @@ public class EventsProducer { sendLegacyEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } + /** + * Generic CloudEvent sender ensuring Exactly Once Semantics behaviour. + * + * @param topicName valid topic name + * @param eventKey message key + * @param event message payload + */ + public void sendCloudEventUsingEos(final String topicName, final String eventKey, final CloudEvent event) { + final CompletableFuture> eventFuture = + cloudEventKafkaTemplateForEos.send(topicName, eventKey, event); + eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, true)); + } + + private void handleLegacyEventCallback(final String topicName, final CompletableFuture> eventFuture) { - eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e)); + eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false)); } private Headers convertToKafkaHeaders(final Map eventMessageHeaders) { @@ -120,12 +141,16 @@ public class EventsProducer { return eventHeaders; } - private static void logOutcome(final String topicName, final SendResult result, final Throwable e) { + private static void logOutcome(final String topicName, final SendResult result, final Throwable e, + final boolean throwKafkaException) { if (e == null) { final Object event = result.getProducerRecord().value(); log.debug("Successfully sent event to topic : {} , Event : {}", topicName, event); } else { log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage()); + if (throwKafkaException && e instanceof KafkaException) { + throw (KafkaException) e; + } } } diff --git a/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy index bf543787a5..8c71fea492 100644 --- a/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy @@ -41,8 +41,9 @@ import java.util.concurrent.CompletableFuture class EventsProducerSpec extends Specification { - def legacyKafkaTemplateMock = Mock(KafkaTemplate) + def mockLegacyKafkaTemplate = Mock(KafkaTemplate) def mockCloudEventKafkaTemplate = Mock(KafkaTemplate) + def mockCloudEventKafkaTemplateForEos = Mock(KafkaTemplate) def logger = Spy(ListAppender) void setup() { @@ -56,7 +57,7 @@ class EventsProducerSpec extends Specification { ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders() } - def objectUnderTest = new EventsProducer(legacyKafkaTemplateMock, mockCloudEventKafkaTemplate) + def objectUnderTest = new EventsProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos) def 'Send Cloud Event'() { given: 'a successfully sent event' @@ -71,9 +72,23 @@ class EventsProducerSpec extends Specification { when: 'sending the cloud event' objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent) then: 'the correct debug message is logged' - def lastLoggingEvent = logger.list[0] - assert lastLoggingEvent.level == Level.DEBUG - assert lastLoggingEvent.formattedMessage.contains('Successfully sent event') + assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') + } + + def 'Send Cloud Event Using EOS'() { + given: 'a successfull result from send event with EOS semantics' + def eventFuture = CompletableFuture.completedFuture( + new SendResult( + new ProducerRecord('eos-topic', 'some-value'), + new RecordMetadata(new TopicPartition('eos-topic', 0), 0, 0, 0, 0, 0) + ) + ) + def someCloudEvent = Mock(CloudEvent) + 1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFuture + when: 'sending the cloud event using EOS' + objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent) + then: 'the correct debug message is logged' + assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true } def 'Send Cloud Event with Exception'() { @@ -85,9 +100,20 @@ class EventsProducerSpec extends Specification { when: 'sending the cloud event' objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent) then: 'the correct error message is logged' - def lastLoggingEvent = logger.list[0] - assert lastLoggingEvent.level == Level.ERROR - assert lastLoggingEvent.formattedMessage.contains('Unable to send event') + assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true + } + + def 'Send Cloud Event Using EOS with KafkaException'() { + given: 'an event fails with KafkaException' + def kafkaException = new org.springframework.kafka.KafkaException('some kafka exception') + def eventFutureWithFailure = new CompletableFuture>() + eventFutureWithFailure.completeExceptionally(kafkaException) + def someCloudEvent = Mock(CloudEvent) + 1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure + when: 'sending the cloud event using EOS' + objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent) + then: 'the correct error message is logged' + assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true } def 'Send Legacy Event'() { @@ -99,13 +125,11 @@ class EventsProducerSpec extends Specification { ) ) def someLegacyEvent = Mock(LegacyEvent) - 1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someLegacyEvent) >> eventFuture + 1 * mockLegacyKafkaTemplate.send('some-topic', 'some-event-key', someLegacyEvent) >> eventFuture when: 'sending the cloud event' objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', someLegacyEvent) then: 'the correct debug message is logged' - def lastLoggingEvent = logger.list[0] - assert lastLoggingEvent.level == Level.DEBUG - assert lastLoggingEvent.formattedMessage.contains('Successfully sent event') + assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true } def 'Send Legacy Event with Headers as Map'() { @@ -121,11 +145,9 @@ class EventsProducerSpec extends Specification { when: 'sending the legacy event' objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent) then: 'event is sent' - 1 * legacyKafkaTemplateMock.send(_) >> eventFuture + 1 * mockLegacyKafkaTemplate.send(_) >> eventFuture and: 'the correct debug message is logged' - def lastLoggingEvent = logger.list[0] - assert lastLoggingEvent.level == Level.DEBUG - assert lastLoggingEvent.formattedMessage.contains('Successfully sent event') + assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true } def 'Send Legacy Event with Record Headers'() { @@ -142,11 +164,9 @@ class EventsProducerSpec extends Specification { when: 'sending the legacy event' objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent) then: 'event is sent' - 1 * legacyKafkaTemplateMock.send(_) >> eventFuture + 1 * mockLegacyKafkaTemplate.send(_) >> eventFuture and: 'the correct debug message is logged' - def lastLoggingEvent = logger.list[0] - assert lastLoggingEvent.level == Level.DEBUG - assert lastLoggingEvent.formattedMessage.contains('Successfully sent event') + assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true } def 'Handle Legacy Event Callback'() { @@ -160,9 +180,7 @@ class EventsProducerSpec extends Specification { when: 'handling legacy event callback' objectUnderTest.handleLegacyEventCallback('some-topic', eventFuture) then: 'the correct debug message is logged' - def lastLoggingEvent = logger.list[0] - assert lastLoggingEvent.level == Level.DEBUG - assert lastLoggingEvent.formattedMessage.contains('Successfully sent event') + assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true } def 'Handle Legacy Event Callback with Exception'() { @@ -172,9 +190,21 @@ class EventsProducerSpec extends Specification { when: 'handling legacy event callback' objectUnderTest.handleLegacyEventCallback('some-topic', eventFutureWithFailure) then: 'the correct error message is logged' - def lastLoggingEvent = logger.list[0] - assert lastLoggingEvent.level == Level.ERROR - assert lastLoggingEvent.formattedMessage.contains('Unable to send event') + assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true + } + + def 'Logging of non-kafka exceptions'() { + given: 'a runtime exception that is not KafkaException' + def sendResult = Mock(SendResult) { + getProducerRecord() >> Mock(ProducerRecord) + } + def runtimeException = new RuntimeException('some runtime exception') + def logOutcomeMethod = EventsProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean) + logOutcomeMethod.accessible = true + when: 'logging the outcome with throwKafkaException set to true' + logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException, true) + then: 'error message is logged' + assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true } def 'Convert to kafka headers'() { @@ -189,4 +219,9 @@ class EventsProducerSpec extends Specification { assert headers[1].key() == 'key2' } + def verifyLoggingEvent(expectedLevel, expectedFormattedMessage) { + def lastLoggingEvent = logger.list[0] + lastLoggingEvent.level == expectedLevel && lastLoggingEvent.formattedMessage.contains(expectedFormattedMessage) + } + } \ No newline at end of file diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index 8b55d07e61..a57539e261 100644 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -164,6 +164,8 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,CONNECTIONS_FROM_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 healthcheck: test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1 interval: 10s diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy index b379d6e51f..8084673e30 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy @@ -23,8 +23,6 @@ package org.onap.cps.integration.base import com.hazelcast.map.IMap import okhttp3.mockwebserver.MockWebServer -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.api.CpsAnchorService import org.onap.cps.api.CpsDataService import org.onap.cps.api.CpsDataspaceService @@ -81,8 +79,6 @@ import java.util.concurrent.BlockingQueue @EntityScan('org.onap.cps.ri.models') abstract class CpsIntegrationSpecBase extends Specification { - static KafkaConsumer kafkaConsumer - @Shared DatabaseTestContainer databaseTestContainer = DatabaseTestContainer.getInstance() @@ -344,13 +340,7 @@ abstract class CpsIntegrationSpecBase extends Specification { networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds)) } - def subscribeAndClearPreviousMessages(consumerGroupId, topicName) { - kafkaConsumer = KafkaTestContainer.getConsumer(consumerGroupId, StringDeserializer.class) - kafkaConsumer.subscribe([topicName]) - kafkaConsumer.poll(Duration.ofMillis(500)) - } - - def getLatestConsumerRecordsWithMaxPollOf1Second(numberOfRecordsToRead) { + def getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, numberOfRecordsToRead) { def consumerRecords = [] def retryAttempts = 10 while (consumerRecords.size() < numberOfRecordsToRead) { diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/data/LegacyBatchDataOperationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/data/LegacyBatchDataOperationSpec.groovy index eb1484b73c..89e38c189f 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/data/LegacyBatchDataOperationSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/data/LegacyBatchDataOperationSpec.groovy @@ -21,8 +21,6 @@ package org.onap.cps.integration.functional.ncmp.data import io.cloudevents.CloudEvent -import io.cloudevents.kafka.CloudEventDeserializer -import java.time.Duration import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.cps.integration.KafkaTestContainer import org.onap.cps.integration.base.CpsIntegrationSpecBase @@ -32,6 +30,8 @@ import org.onap.cps.ncmp.events.async1_0_0.Response import org.springframework.http.MediaType import spock.util.concurrent.PollingConditions +import java.time.Duration + import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status @@ -40,7 +40,7 @@ class LegacyBatchDataOperationSpec extends CpsIntegrationSpecBase { KafkaConsumer kafkaConsumer def setup() { - kafkaConsumer = KafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class) + kafkaConsumer = KafkaTestContainer.getCloudEventConsumer('test-group') kafkaConsumer.subscribe(['legacy-batch-topic']) kafkaConsumer.poll(Duration.ofMillis(500)) dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2'] @@ -90,7 +90,7 @@ class LegacyBatchDataOperationSpec extends CpsIntegrationSpecBase { .andExpect(status().is2xxSuccessful()) then: 'there is one kafka message' - def consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000)) + def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1) assert consumerRecords.size() == 1 and: 'it is a cloud event' diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy index 5a13cb2b3d..f19a13f81b 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/datajobs/subscription/CmSubscriptionSpec.groovy @@ -62,7 +62,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { def setup() { registerCmHandlesForSubscriptions() kafkaTestContainer.start() - dmiInConsumer = kafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class) + dmiInConsumer = kafkaTestContainer.getCloudEventConsumer('test-group') dmiInConsumer.subscribe([dmiInTopic]) dmiInConsumer.poll(Duration.ofMillis(500)) testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class) @@ -223,7 +223,7 @@ class CmSubscriptionSpec extends CpsIntegrationSpecBase { } def getAllConsumedCorrelationIds() { - def consumedEvents = dmiInConsumer.poll(Duration.ofMillis(1000)) + def consumedEvents = getLatestConsumerRecordsWithMaxPollOf1Second(dmiInConsumer, 1) def headersMap = getAllHeaders(consumedEvents) return headersMap.get('ce_correlationid') } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy index 2e1c803009..ca8ebb0800 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleCreateSpec.groovy @@ -20,7 +20,9 @@ package org.onap.cps.integration.functional.ncmp.inventory - +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.cps.events.LegacyEvent +import org.onap.cps.integration.KafkaTestContainer import org.onap.cps.integration.base.CpsIntegrationSpecBase import org.onap.cps.ncmp.api.NcmpResponseStatus import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse @@ -31,11 +33,15 @@ import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle import org.onap.cps.ncmp.events.lcm.v1.LcmEvent import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl +import java.time.Duration + class CmHandleCreateSpec extends CpsIntegrationSpecBase { NetworkCmProxyInventoryFacadeImpl objectUnderTest def uniqueId = 'ch-unique-id-for-create-test' + KafkaConsumer kafkaConsumer + def setup() { objectUnderTest = networkCmProxyInventoryFacade subscribeAndClearPreviousMessages('test-group', 'ncmp-events') @@ -72,7 +78,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort() then: 'get the latest messages' - def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(2) + def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2) and: 'both converted messages are for the correct cm handle' def notificationMessages = [] @@ -90,7 +96,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { assert notificationMessages[1].event.dataProducerIdentifier == 'my-data-producer-identifier' and: 'there are no more messages to be read' - assert getLatestConsumerRecordsWithMaxPollOf1Second(1).size() == 0 + assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1).size() == 0 cleanup: 'deregister CM handle' deregisterCmHandle(DMI1_URL, uniqueId) @@ -215,4 +221,10 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2']) } + def subscribeAndClearPreviousMessages(consumerGroupId, topicName) { + kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId) + kafkaConsumer.subscribe([topicName]) + kafkaConsumer.poll(Duration.ofMillis(500)) + } + } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleUpdateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleUpdateSpec.groovy index 18c7096b66..1bb7ee2bca 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleUpdateSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/inventory/CmHandleUpdateSpec.groovy @@ -20,6 +20,9 @@ package org.onap.cps.integration.functional.ncmp.inventory +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.cps.events.LegacyEvent +import org.onap.cps.integration.KafkaTestContainer import org.onap.cps.integration.base.CpsIntegrationSpecBase import org.onap.cps.ncmp.api.NcmpResponseStatus import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse @@ -28,10 +31,15 @@ import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle import org.onap.cps.ncmp.events.lcm.v1.LcmEvent import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl +import java.time.Duration + class CmHandleUpdateSpec extends CpsIntegrationSpecBase { NetworkCmProxyInventoryFacadeImpl objectUnderTest + KafkaConsumer kafkaConsumer + + def setup() { objectUnderTest = networkCmProxyInventoryFacade subscribeAndClearPreviousMessages('test-group-for-update', 'ncmp-events') @@ -111,7 +119,7 @@ class CmHandleUpdateSpec extends CpsIntegrationSpecBase { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() and: 'flush the latest cm handle registration events( state transition from NONE to ADVISED and ADVISED to READY)' - getLatestConsumerRecordsWithMaxPollOf1Second(2) + getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2) and: 'cm handle updated with the data producer identifier' def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, dataProducerIdentifier: 'my-data-producer-id') @@ -122,7 +130,7 @@ class CmHandleUpdateSpec extends CpsIntegrationSpecBase { assert dmiPluginRegistrationResponseForUpdate.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)] and: 'get the latest message' - def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(1) + def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1) and: 'the message has the updated data producer identifier' def notificationMessages = [] @@ -136,4 +144,9 @@ class CmHandleUpdateSpec extends CpsIntegrationSpecBase { deregisterCmHandle(DMI1_URL, cmHandleId) } + def subscribeAndClearPreviousMessages(consumerGroupId, topicName) { + kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId) + kafkaConsumer.subscribe([topicName]) + kafkaConsumer.poll(Duration.ofMillis(500)) + } } diff --git a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java index 6494ab92ed..07735901f8 100644 --- a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java +++ b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java @@ -19,6 +19,8 @@ package org.onap.cps.integration; +import io.cloudevents.CloudEvent; +import io.cloudevents.kafka.CloudEventDeserializer; import java.util.HashMap; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -28,6 +30,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.onap.cps.events.LegacyEvent; import org.testcontainers.kafka.ConfluentKafkaContainer; /** @@ -68,8 +71,12 @@ public class KafkaTestContainer extends ConfluentKafkaContainer { return kafkaTestContainer; } - public static KafkaConsumer getConsumer(final String consumerGroupId, final Object valueDeserializer) { - return new KafkaConsumer<>(consumerProperties(consumerGroupId, valueDeserializer)); + public static KafkaConsumer getLegacyEventConsumer(final String consumerGroupId) { + return new KafkaConsumer<>(consumerProperties(consumerGroupId, StringDeserializer.class)); + } + + public static KafkaConsumer getCloudEventConsumer(final String consumerGroupId) { + return new KafkaConsumer<>(consumerProperties(consumerGroupId, CloudEventDeserializer.class)); } /** diff --git a/integration-test/src/test/resources/application.yml b/integration-test/src/test/resources/application.yml index efe595ed7f..a59c0b8c6f 100644 --- a/integration-test/src/test/resources/application.yml +++ b/integration-test/src/test/resources/application.yml @@ -184,6 +184,10 @@ ncmp: model-loader: maximum-attempt-count: 20 + notifications: + avc-event-producer: + transaction-id-prefix: integration-test-tx- + servlet: multipart: enabled: true -- 2.16.6