model-loader:
maximum-attempt-count: 20
+ notifications:
+ avc-event-producer:
+ transaction-id-prefix: tx-
+
# Custom Hazelcast Config.
hazelcast:
cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"}
model-loader:
maximum-attempt-count: 20
+ notifications:
+ avc-event-producer:
+ transaction-id-prefix: test-tx-
+
# Custom Hazelcast Config.
hazelcast:
cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.config;
+
+import io.cloudevents.CloudEvent;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
+import java.time.Duration;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.ssl.SslBundles;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.KafkaException;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
+import org.springframework.kafka.transaction.KafkaTransactionManager;
+
+/**
+ * kafka Configuration for implementing Exactly Once Semantics using cloud events.
+ */
+@Configuration
+@EnableKafka
+@RequiredArgsConstructor
+public class ExactlyOnceSemanticsKafkaConfig {
+
+ private final KafkaProperties kafkaProperties;
+
+ @Value("${cps.tracing.enabled:false}")
+ private boolean tracingEnabled;
+
+ @Value("${ncmp.notifications.avc-event-producer.transaction-id-prefix:tx-}")
+ private String transactionIdPrefixForExactlyOnceSemantics;
+
+ private static final SslBundles NO_SSL = null;
+
+
+ /**
+ * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
+ * application.yml with CloudEventSerializer.This factory is configured to support
+ * exactly-once semantics by enabling idempotence and setting a transaction ID prefix.
+ *
+ * @return cloud event producer instance.
+ */
+ @Bean
+ public ProducerFactory<String, CloudEvent> cloudEventProducerFactoryForEos() {
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
+ producerConfigProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ producerConfigProperties.put(ProducerConfig.ACKS_CONFIG, "all");
+ if (tracingEnabled) {
+ producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ TracingProducerInterceptor.class.getName());
+ }
+ final DefaultKafkaProducerFactory<String, CloudEvent> defaultKafkaProducerFactory =
+ new DefaultKafkaProducerFactory<>(producerConfigProperties);
+ defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefixForExactlyOnceSemantics);
+ return defaultKafkaProducerFactory;
+ }
+
+ /**
+ * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
+ * into application.yml having CloudEventDeserializer as deserializer-value.This factory is configured with
+ * read_committed isolation level to support exactly-once semantics.
+ *
+ * @return an instance of cloud consumer factory.
+ */
+ @Bean
+ public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactoryForEos() {
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
+ consumerConfigProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+ if (tracingEnabled) {
+ consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ TracingConsumerInterceptor.class.getName());
+ }
+ return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+ }
+
+
+ /**
+ * A cloud Kafka event template for executing high-level operations. The template is configured using the cloud
+ * event producer and consumer factories to support
+ * exactly-once semantics.
+ *
+ * @return an instance of cloud Kafka template.
+ */
+ @Bean(name = "cloudEventKafkaTemplateForEos")
+ public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos() {
+ final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactoryForEos());
+ kafkaTemplate.setConsumerFactory(cloudEventConsumerFactoryForEos());
+ if (tracingEnabled) {
+ kafkaTemplate.setObservationEnabled(true);
+ }
+ return kafkaTemplate;
+ }
+
+ /**
+ * A Concurrent CloudEvent kafka listener container factory.
+ * This factory supports exactly-once semantics, retry handling, and optional tracing.
+ *
+ * @return instance of Concurrent kafka listener factory
+ */
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
+ cloudEventConcurrentKafkaListenerContainerFactoryForEos() {
+ final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ containerFactory.setConsumerFactory(cloudEventConsumerFactoryForEos());
+ containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
+ containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
+ containerFactory.getContainerProperties().setKafkaAwareTransactionManager(kafkaTransactionManagerForEos());
+ containerFactory.setCommonErrorHandler(kafkaErrorHandlerWithMaxRetriesForEos());
+ if (tracingEnabled) {
+ containerFactory.getContainerProperties().setObservationEnabled(true);
+ }
+
+ return containerFactory;
+ }
+
+ private KafkaTransactionManager<String, CloudEvent> kafkaTransactionManagerForEos() {
+ return new KafkaTransactionManager<>(cloudEventProducerFactoryForEos());
+ }
+
+ private DefaultErrorHandler kafkaErrorHandlerWithMaxRetriesForEos() {
+
+ final ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries =
+ new ExponentialBackOffWithMaxRetries(Integer.MAX_VALUE);
+ exponentialBackOffWithMaxRetries.setInitialInterval(1000L); // 1 sec
+ exponentialBackOffWithMaxRetries.setMultiplier(2.0);
+ exponentialBackOffWithMaxRetries.setMaxInterval(30_000L); // 30 sec
+ final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(exponentialBackOffWithMaxRetries);
+ defaultErrorHandler.addRetryableExceptions(KafkaException.class);
+
+ return defaultErrorHandler;
+ }
+}
*
* @return an instance of legacy Kafka template.
*/
- @Bean
- @Primary
+ @Bean(name = "legacyEventKafkaTemplate")
public KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate() {
final KafkaTemplate<String, LegacyEvent> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
*
* @return an instance of cloud Kafka template.
*/
- @Bean
+ @Primary
+ @Bean(name = "cloudEventKafkaTemplate")
public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.KafkaException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
/**
* Listener for AVC events based on CM Subscriptions.
*
* @param cmAvcEventAsConsumerRecord Incoming raw consumer record
*/
+ @Transactional
@KafkaListener(topics = "${app.dmi.cm-events.topic}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactoryForEos")
@Timed(value = "cps.ncmp.cm.notifications.consume.and.forward", description = "Time taken to forward CM AVC events")
public void consumeAndForward(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
if (isEventFromOnapDmiPlugin(cmAvcEventAsConsumerRecord.headers())) {
}
final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
- log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
- eventsProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
+
+ // Only for testing/demo
+ if (outgoingAvcEventKey.equals("retry")) {
+ throw new KafkaException("test kafka exception for testing");
+ }
+
+ log.info("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
+ eventsProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
}
private void processCmAvcEventChanges(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
import org.onap.cps.events.LegacyEvent
import org.spockframework.spring.EnableSharedInjection
import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.test.context.SpringBootTest
import spock.lang.Shared
import spock.lang.Specification
-@SpringBootTest(classes = [KafkaProperties, KafkaConfig])
+@SpringBootTest(classes = [KafkaProperties, KafkaConfig, ExactlyOnceSemanticsKafkaConfig])
@EnableSharedInjection
@EnableConfigurationProperties
@TestPropertySource(properties = ["cps.tracing.enabled=true"])
@Shared
@Autowired
+ @Qualifier("legacyEventKafkaTemplate")
KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate
@Shared
@Autowired
+ @Qualifier("cloudEventKafkaTemplate")
KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate
+ @Shared
+ @Autowired
+ @Qualifier("cloudEventKafkaTemplateForEos")
+ KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos
+
+
def 'Verify kafka template serializer and deserializer configuration for #eventType.'() {
expect: 'kafka template is instantiated'
assert kafkaTemplateInstance.properties['beanName'] == beanName
and: 'verify event key and value deserializer'
assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName())
where: 'the following event type is used'
- eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer
- 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer
- 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer
+ eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer
+ 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer
+ 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer
+ 'transactional cloud event' | cloudEventKafkaTemplateForEos || 'cloudEventKafkaTemplateForEos' | CloudEventSerializer | CloudEventDeserializer
}
}
@SpringBean
EventsProducer cpsAsyncRequestResponseEventProducer =
- new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
+ new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
@SpringBean
class DataOperationEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
@SpringBean
DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer)
JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
@SpringBean
- EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
def 'Process per data operation request with #serviceName.'() {
given: 'data operation request with 3 operations'
class CmAvcEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
def mockCmAvcEventService = Mock(CmAvcEventService)
def mockInventoryPersistence = Mock(InventoryPersistence)
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.LegacyEvent
import org.onap.cps.ncmp.events.lcm.v1.Event
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
import org.onap.cps.ncmp.utils.TestUtils
@DirtiesContext
class EventsProducerSpec extends MessagingBaseSpec {
- def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
+ def legacyEventKafkaConsumer = new KafkaConsumer<String, LegacyEvent>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
def testTopic = 'ncmp-events-test'
@SpringBean
- EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
@Autowired
JsonObjectMapper jsonObjectMapper
@ContextConfiguration(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper])
class InventoryEventProducerSpec extends MessagingBaseSpec {
- def mockEventsProducer = Mock(EventsProducer<CloudEvent>)
+ def mockEventsProducer = Mock(EventsProducer)
def objectUnderTest = new InventoryEventProducer(mockEventsProducer)
def 'Send an attribute value change event'() {
import io.cloudevents.kafka.CloudEventSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.cps.events.LegacyEvent
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.serializer.JsonSerializer
static kafkaTestContainer = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0")
- def legacyEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, ?>(eventProducerConfigProperties(JsonSerializer)))
+ def legacyEventKafkaTemplate = new KafkaTemplate<String, LegacyEvent>(new DefaultKafkaProducerFactory<String, LegacyEvent>(eventProducerConfigProperties(JsonSerializer)))
- def cloudEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
+ def cloudEventKafkaTemplate = new KafkaTemplate<String, CloudEvent>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
+
+ def cloudEventKafkaTemplateForEos = new KafkaTemplate<String, CloudEvent>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
@DynamicPropertySource
static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
* Note: Cloud events should be used. This will be addressed as part of <a
* href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
*/
- private final KafkaTemplate<String, LegacyEvent> legacyKafkaEventTemplate;
+ @Qualifier("legacyEventKafkaTemplate")
+ private final KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate;
+ @Qualifier("cloudEventKafkaTemplate")
private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
+ @Qualifier("cloudEventKafkaTemplateForEos")
+ private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos;
+
/**
* Generic CloudEvent sender.
*
public void sendCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
final CompletableFuture<SendResult<String, CloudEvent>> eventFuture =
cloudEventKafkaTemplate.send(topicName, eventKey, event);
- eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e));
+ eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false));
}
/**
*/
public void sendLegacyEvent(final String topicName, final String eventKey, final LegacyEvent event) {
final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture =
- legacyKafkaEventTemplate.send(topicName, eventKey, event);
+ legacyEventKafkaTemplate.send(topicName, eventKey, event);
handleLegacyEventCallback(topicName, eventFuture);
}
final ProducerRecord<String, LegacyEvent> producerRecord =
new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture =
- legacyKafkaEventTemplate.send(producerRecord);
+ legacyEventKafkaTemplate.send(producerRecord);
handleLegacyEventCallback(topicName, eventFuture);
}
sendLegacyEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
}
+ /**
+ * Generic CloudEvent sender ensuring Exactly Once Semantics behaviour.
+ *
+ * @param topicName valid topic name
+ * @param eventKey message key
+ * @param event message payload
+ */
+ public void sendCloudEventUsingEos(final String topicName, final String eventKey, final CloudEvent event) {
+ final CompletableFuture<SendResult<String, CloudEvent>> eventFuture =
+ cloudEventKafkaTemplateForEos.send(topicName, eventKey, event);
+ eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, true));
+ }
+
+
private void handleLegacyEventCallback(final String topicName,
final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture) {
- eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e));
+ eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false));
}
private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) {
return eventHeaders;
}
- private static void logOutcome(final String topicName, final SendResult<String, ?> result, final Throwable e) {
+ private static void logOutcome(final String topicName, final SendResult<String, ?> result, final Throwable e,
+ final boolean throwKafkaException) {
if (e == null) {
final Object event = result.getProducerRecord().value();
log.debug("Successfully sent event to topic : {} , Event : {}", topicName, event);
} else {
log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage());
+ if (throwKafkaException && e instanceof KafkaException) {
+ throw (KafkaException) e;
+ }
}
}
class EventsProducerSpec extends Specification {
- def legacyKafkaTemplateMock = Mock(KafkaTemplate)
+ def mockLegacyKafkaTemplate = Mock(KafkaTemplate)
def mockCloudEventKafkaTemplate = Mock(KafkaTemplate)
+ def mockCloudEventKafkaTemplateForEos = Mock(KafkaTemplate)
def logger = Spy(ListAppender<ILoggingEvent>)
void setup() {
((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders()
}
- def objectUnderTest = new EventsProducer(legacyKafkaTemplateMock, mockCloudEventKafkaTemplate)
+ def objectUnderTest = new EventsProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos)
def 'Send Cloud Event'() {
given: 'a successfully sent event'
when: 'sending the cloud event'
objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent)
then: 'the correct debug message is logged'
- def lastLoggingEvent = logger.list[0]
- assert lastLoggingEvent.level == Level.DEBUG
- assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
+ assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event')
+ }
+
+ def 'Send Cloud Event Using EOS'() {
+ given: 'a successfull result from send event with EOS semantics'
+ def eventFuture = CompletableFuture.completedFuture(
+ new SendResult(
+ new ProducerRecord('eos-topic', 'some-value'),
+ new RecordMetadata(new TopicPartition('eos-topic', 0), 0, 0, 0, 0, 0)
+ )
+ )
+ def someCloudEvent = Mock(CloudEvent)
+ 1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFuture
+ when: 'sending the cloud event using EOS'
+ objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent)
+ then: 'the correct debug message is logged'
+ assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
}
def 'Send Cloud Event with Exception'() {
when: 'sending the cloud event'
objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent)
then: 'the correct error message is logged'
- def lastLoggingEvent = logger.list[0]
- assert lastLoggingEvent.level == Level.ERROR
- assert lastLoggingEvent.formattedMessage.contains('Unable to send event')
+ assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
+ }
+
+ def 'Send Cloud Event Using EOS with KafkaException'() {
+ given: 'an event fails with KafkaException'
+ def kafkaException = new org.springframework.kafka.KafkaException('some kafka exception')
+ def eventFutureWithFailure = new CompletableFuture<SendResult<String, CloudEvent>>()
+ eventFutureWithFailure.completeExceptionally(kafkaException)
+ def someCloudEvent = Mock(CloudEvent)
+ 1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure
+ when: 'sending the cloud event using EOS'
+ objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent)
+ then: 'the correct error message is logged'
+ assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
}
def 'Send Legacy Event'() {
)
)
def someLegacyEvent = Mock(LegacyEvent)
- 1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someLegacyEvent) >> eventFuture
+ 1 * mockLegacyKafkaTemplate.send('some-topic', 'some-event-key', someLegacyEvent) >> eventFuture
when: 'sending the cloud event'
objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', someLegacyEvent)
then: 'the correct debug message is logged'
- def lastLoggingEvent = logger.list[0]
- assert lastLoggingEvent.level == Level.DEBUG
- assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
+ assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
}
def 'Send Legacy Event with Headers as Map'() {
when: 'sending the legacy event'
objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent)
then: 'event is sent'
- 1 * legacyKafkaTemplateMock.send(_) >> eventFuture
+ 1 * mockLegacyKafkaTemplate.send(_) >> eventFuture
and: 'the correct debug message is logged'
- def lastLoggingEvent = logger.list[0]
- assert lastLoggingEvent.level == Level.DEBUG
- assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
+ assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
}
def 'Send Legacy Event with Record Headers'() {
when: 'sending the legacy event'
objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent)
then: 'event is sent'
- 1 * legacyKafkaTemplateMock.send(_) >> eventFuture
+ 1 * mockLegacyKafkaTemplate.send(_) >> eventFuture
and: 'the correct debug message is logged'
- def lastLoggingEvent = logger.list[0]
- assert lastLoggingEvent.level == Level.DEBUG
- assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
+ assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
}
def 'Handle Legacy Event Callback'() {
when: 'handling legacy event callback'
objectUnderTest.handleLegacyEventCallback('some-topic', eventFuture)
then: 'the correct debug message is logged'
- def lastLoggingEvent = logger.list[0]
- assert lastLoggingEvent.level == Level.DEBUG
- assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
+ assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
}
def 'Handle Legacy Event Callback with Exception'() {
when: 'handling legacy event callback'
objectUnderTest.handleLegacyEventCallback('some-topic', eventFutureWithFailure)
then: 'the correct error message is logged'
- def lastLoggingEvent = logger.list[0]
- assert lastLoggingEvent.level == Level.ERROR
- assert lastLoggingEvent.formattedMessage.contains('Unable to send event')
+ assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
+ }
+
+ def 'Logging of non-kafka exceptions'() {
+ given: 'a runtime exception that is not KafkaException'
+ def sendResult = Mock(SendResult) {
+ getProducerRecord() >> Mock(ProducerRecord)
+ }
+ def runtimeException = new RuntimeException('some runtime exception')
+ def logOutcomeMethod = EventsProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean)
+ logOutcomeMethod.accessible = true
+ when: 'logging the outcome with throwKafkaException set to true'
+ logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException, true)
+ then: 'error message is logged'
+ assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
}
def 'Convert to kafka headers'() {
assert headers[1].key() == 'key2'
}
+ def verifyLoggingEvent(expectedLevel, expectedFormattedMessage) {
+ def lastLoggingEvent = logger.list[0]
+ lastLoggingEvent.level == expectedLevel && lastLoggingEvent.formattedMessage.contains(expectedFormattedMessage)
+ }
+
}
\ No newline at end of file
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,CONNECTIONS_FROM_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
healthcheck:
test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1
interval: 10s
import com.hazelcast.map.IMap
import okhttp3.mockwebserver.MockWebServer
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.api.CpsAnchorService
import org.onap.cps.api.CpsDataService
import org.onap.cps.api.CpsDataspaceService
@EntityScan('org.onap.cps.ri.models')
abstract class CpsIntegrationSpecBase extends Specification {
- static KafkaConsumer kafkaConsumer
-
@Shared
DatabaseTestContainer databaseTestContainer = DatabaseTestContainer.getInstance()
networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
}
- def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
- kafkaConsumer = KafkaTestContainer.getConsumer(consumerGroupId, StringDeserializer.class)
- kafkaConsumer.subscribe([topicName])
- kafkaConsumer.poll(Duration.ofMillis(500))
- }
-
- def getLatestConsumerRecordsWithMaxPollOf1Second(numberOfRecordsToRead) {
+ def getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, numberOfRecordsToRead) {
def consumerRecords = []
def retryAttempts = 10
while (consumerRecords.size() < numberOfRecordsToRead) {
package org.onap.cps.integration.functional.ncmp.data
import io.cloudevents.CloudEvent
-import io.cloudevents.kafka.CloudEventDeserializer
-import java.time.Duration
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.cps.integration.KafkaTestContainer
import org.onap.cps.integration.base.CpsIntegrationSpecBase
import org.springframework.http.MediaType
import spock.util.concurrent.PollingConditions
+import java.time.Duration
+
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status
KafkaConsumer kafkaConsumer
def setup() {
- kafkaConsumer = KafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class)
+ kafkaConsumer = KafkaTestContainer.getCloudEventConsumer('test-group')
kafkaConsumer.subscribe(['legacy-batch-topic'])
kafkaConsumer.poll(Duration.ofMillis(500))
dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
.andExpect(status().is2xxSuccessful())
then: 'there is one kafka message'
- def consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000))
+ def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1)
assert consumerRecords.size() == 1
and: 'it is a cloud event'
def setup() {
registerCmHandlesForSubscriptions()
kafkaTestContainer.start()
- dmiInConsumer = kafkaTestContainer.getConsumer('test-group', CloudEventDeserializer.class)
+ dmiInConsumer = kafkaTestContainer.getCloudEventConsumer('test-group')
dmiInConsumer.subscribe([dmiInTopic])
dmiInConsumer.poll(Duration.ofMillis(500))
testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class)
}
def getAllConsumedCorrelationIds() {
- def consumedEvents = dmiInConsumer.poll(Duration.ofMillis(1000))
+ def consumedEvents = getLatestConsumerRecordsWithMaxPollOf1Second(dmiInConsumer, 1)
def headersMap = getAllHeaders(consumedEvents)
return headersMap.get('ce_correlationid')
}
package org.onap.cps.integration.functional.ncmp.inventory
-
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.cps.events.LegacyEvent
+import org.onap.cps.integration.KafkaTestContainer
import org.onap.cps.integration.base.CpsIntegrationSpecBase
import org.onap.cps.ncmp.api.NcmpResponseStatus
import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
+import java.time.Duration
+
class CmHandleCreateSpec extends CpsIntegrationSpecBase {
NetworkCmProxyInventoryFacadeImpl objectUnderTest
def uniqueId = 'ch-unique-id-for-create-test'
+ KafkaConsumer<String, LegacyEvent> kafkaConsumer
+
def setup() {
objectUnderTest = networkCmProxyInventoryFacade
subscribeAndClearPreviousMessages('test-group', 'ncmp-events')
assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
then: 'get the latest messages'
- def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(2)
+ def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2)
and: 'both converted messages are for the correct cm handle'
def notificationMessages = []
assert notificationMessages[1].event.dataProducerIdentifier == 'my-data-producer-identifier'
and: 'there are no more messages to be read'
- assert getLatestConsumerRecordsWithMaxPollOf1Second(1).size() == 0
+ assert getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1).size() == 0
cleanup: 'deregister CM handle'
deregisterCmHandle(DMI1_URL, uniqueId)
deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2'])
}
+ def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
+ kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId)
+ kafkaConsumer.subscribe([topicName])
+ kafkaConsumer.poll(Duration.ofMillis(500))
+ }
+
}
package org.onap.cps.integration.functional.ncmp.inventory
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.cps.events.LegacyEvent
+import org.onap.cps.integration.KafkaTestContainer
import org.onap.cps.integration.base.CpsIntegrationSpecBase
import org.onap.cps.ncmp.api.NcmpResponseStatus
import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
+import java.time.Duration
+
class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
NetworkCmProxyInventoryFacadeImpl objectUnderTest
+ KafkaConsumer<String, LegacyEvent> kafkaConsumer
+
+
def setup() {
objectUnderTest = networkCmProxyInventoryFacade
subscribeAndClearPreviousMessages('test-group-for-update', 'ncmp-events')
moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
and: 'flush the latest cm handle registration events( state transition from NONE to ADVISED and ADVISED to READY)'
- getLatestConsumerRecordsWithMaxPollOf1Second(2)
+ getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 2)
and: 'cm handle updated with the data producer identifier'
def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, dataProducerIdentifier: 'my-data-producer-id')
assert dmiPluginRegistrationResponseForUpdate.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
and: 'get the latest message'
- def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(1)
+ def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(kafkaConsumer, 1)
and: 'the message has the updated data producer identifier'
def notificationMessages = []
deregisterCmHandle(DMI1_URL, cmHandleId)
}
+ def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
+ kafkaConsumer = KafkaTestContainer.getLegacyEventConsumer(consumerGroupId)
+ kafkaConsumer.subscribe([topicName])
+ kafkaConsumer.poll(Duration.ofMillis(500))
+ }
}
package org.onap.cps.integration;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.CloudEventDeserializer;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.onap.cps.events.LegacyEvent;
import org.testcontainers.kafka.ConfluentKafkaContainer;
/**
return kafkaTestContainer;
}
- public static KafkaConsumer getConsumer(final String consumerGroupId, final Object valueDeserializer) {
- return new KafkaConsumer<>(consumerProperties(consumerGroupId, valueDeserializer));
+ public static KafkaConsumer<String, LegacyEvent> getLegacyEventConsumer(final String consumerGroupId) {
+ return new KafkaConsumer<>(consumerProperties(consumerGroupId, StringDeserializer.class));
+ }
+
+ public static KafkaConsumer<String, CloudEvent> getCloudEventConsumer(final String consumerGroupId) {
+ return new KafkaConsumer<>(consumerProperties(consumerGroupId, CloudEventDeserializer.class));
}
/**
model-loader:
maximum-attempt-count: 20
+ notifications:
+ avc-event-producer:
+ transaction-id-prefix: integration-test-tx-
+
servlet:
multipart:
enabled: true