model-loader:
maximum-attempt-count: 20
- notifications:
- avc-event-producer:
- transaction-id-prefix: tx-
-
# Custom Hazelcast Config.
hazelcast:
cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"}
model-loader:
maximum-attempt-count: 20
- notifications:
- avc-event-producer:
- transaction-id-prefix: test-tx-
-
# Custom Hazelcast Config.
hazelcast:
cluster-name: ${CPS_NCMP_CACHES_CLUSTER_NAME:"cps-and-ncmp-common-cache-cluster"}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.config;
-
-import io.cloudevents.CloudEvent;
-import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
-import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
-import java.time.Duration;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.boot.ssl.SslBundles;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.KafkaException;
-import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.listener.ContainerProperties;
-import org.springframework.kafka.listener.DefaultErrorHandler;
-import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
-import org.springframework.kafka.transaction.KafkaTransactionManager;
-
-/**
- * kafka Configuration for implementing Exactly Once Semantics using cloud events.
- */
-@Configuration
-@EnableKafka
-@RequiredArgsConstructor
-public class ExactlyOnceSemanticsKafkaConfig {
-
- private final KafkaProperties kafkaProperties;
-
- @Value("${cps.tracing.enabled:false}")
- private boolean tracingEnabled;
-
- @Value("${ncmp.notifications.avc-event-producer.transaction-id-prefix:tx-}")
- private String transactionIdPrefixForExactlyOnceSemantics;
-
- private static final SslBundles NO_SSL = null;
-
-
- /**
- * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
- * application.yml with CloudEventSerializer.This factory is configured to support
- * exactly-once semantics by enabling idempotence and setting a transaction ID prefix.
- *
- * @return cloud event producer instance.
- */
- @Bean
- public ProducerFactory<String, CloudEvent> cloudEventProducerFactoryForEos() {
- final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
- producerConfigProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
- producerConfigProperties.put(ProducerConfig.ACKS_CONFIG, "all");
- if (tracingEnabled) {
- producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
- TracingProducerInterceptor.class.getName());
- }
- final DefaultKafkaProducerFactory<String, CloudEvent> defaultKafkaProducerFactory =
- new DefaultKafkaProducerFactory<>(producerConfigProperties);
- defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefixForExactlyOnceSemantics);
- return defaultKafkaProducerFactory;
- }
-
- /**
- * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
- * into application.yml having CloudEventDeserializer as deserializer-value.This factory is configured with
- * read_committed isolation level to support exactly-once semantics.
- *
- * @return an instance of cloud consumer factory.
- */
- @Bean
- public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactoryForEos() {
- final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
- consumerConfigProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
- if (tracingEnabled) {
- consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
- TracingConsumerInterceptor.class.getName());
- }
- return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
- }
-
-
- /**
- * A cloud Kafka event template for executing high-level operations. The template is configured using the cloud
- * event producer and consumer factories to support
- * exactly-once semantics.
- *
- * @return an instance of cloud Kafka template.
- */
- @Bean(name = "cloudEventKafkaTemplateForEos")
- public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos() {
- final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactoryForEos());
- kafkaTemplate.setConsumerFactory(cloudEventConsumerFactoryForEos());
- if (tracingEnabled) {
- kafkaTemplate.setObservationEnabled(true);
- }
- return kafkaTemplate;
- }
-
- /**
- * A Concurrent CloudEvent kafka listener container factory.
- * This factory supports exactly-once semantics, retry handling, and optional tracing.
- *
- * @return instance of Concurrent kafka listener factory
- */
- @Bean
- public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
- cloudEventConcurrentKafkaListenerContainerFactoryForEos() {
- final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
- new ConcurrentKafkaListenerContainerFactory<>();
- containerFactory.setConsumerFactory(cloudEventConsumerFactoryForEos());
- containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
- containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
- containerFactory.getContainerProperties().setKafkaAwareTransactionManager(kafkaTransactionManagerForEos());
- containerFactory.setCommonErrorHandler(kafkaErrorHandlerWithMaxRetriesForEos());
- if (tracingEnabled) {
- containerFactory.getContainerProperties().setObservationEnabled(true);
- }
-
- return containerFactory;
- }
-
- private KafkaTransactionManager<String, CloudEvent> kafkaTransactionManagerForEos() {
- return new KafkaTransactionManager<>(cloudEventProducerFactoryForEos());
- }
-
- private DefaultErrorHandler kafkaErrorHandlerWithMaxRetriesForEos() {
-
- final ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries =
- new ExponentialBackOffWithMaxRetries(Integer.MAX_VALUE);
- exponentialBackOffWithMaxRetries.setInitialInterval(1000L); // 1 sec
- exponentialBackOffWithMaxRetries.setMultiplier(2.0);
- exponentialBackOffWithMaxRetries.setMaxInterval(30_000L); // 30 sec
- final DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(exponentialBackOffWithMaxRetries);
- defaultErrorHandler.addRetryableExceptions(KafkaException.class);
-
- return defaultErrorHandler;
- }
-}
*
* @return an instance of legacy Kafka template.
*/
- @Bean(name = "legacyEventKafkaTemplate")
+ @Bean
+ @Primary
public KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate() {
final KafkaTemplate<String, LegacyEvent> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
*
* @return an instance of cloud Kafka template.
*/
- @Primary
- @Bean(name = "cloudEventKafkaTemplate")
+ @Bean
public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
+ * Copyright (c) 2023-2026 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
/**
* Listener for AVC events based on CM Subscriptions.
*
* @param cmAvcEventAsConsumerRecord Incoming raw consumer record
*/
- @Transactional
@KafkaListener(topics = "${app.dmi.cm-events.topic}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactoryForEos")
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
@Timed(value = "cps.ncmp.cm.notifications.consume.and.forward", description = "Time taken to forward CM AVC events")
public void consumeAndForward(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
if (isEventFromOnapDmiPlugin(cmAvcEventAsConsumerRecord.headers())) {
}
final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
-
log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
- eventProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
+ eventProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
}
private void processCmAvcEventChanges(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
private boolean isEventFromOnapDmiPlugin(final Headers headers) {
final String sourceSystem = KafkaHeaders.getParsedKafkaHeader(headers, CLOUD_EVENT_SOURCE_SYSTEM_HEADER_KEY);
- return "ONAP-DMI-PLUGIN".equals(sourceSystem);
+ return sourceSystem != null && sourceSystem.equals("ONAP-DMI-PLUGIN");
}
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
+ * Copyright (C) 2023-2026 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.onap.cps.events.LegacyEvent
import org.spockframework.spring.EnableSharedInjection
import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.test.context.SpringBootTest
import spock.lang.Shared
import spock.lang.Specification
-@SpringBootTest(classes = [KafkaProperties, KafkaConfig, ExactlyOnceSemanticsKafkaConfig])
+@SpringBootTest(classes = [KafkaProperties, KafkaConfig])
@EnableSharedInjection
@EnableConfigurationProperties
@TestPropertySource(properties = ["cps.tracing.enabled=true"])
@Shared
@Autowired
- @Qualifier("legacyEventKafkaTemplate")
KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate
@Shared
@Autowired
- @Qualifier("cloudEventKafkaTemplate")
KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate
- @Shared
- @Autowired
- @Qualifier("cloudEventKafkaTemplateForEos")
- KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos
-
-
def 'Verify kafka template serializer and deserializer configuration for #eventType.'() {
expect: 'kafka template is instantiated'
assert kafkaTemplateInstance.properties['beanName'] == beanName
and: 'verify event key and value deserializer'
assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName())
where: 'the following event type is used'
- eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer
- 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer
- 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer
- 'transactional cloud event' | cloudEventKafkaTemplateForEos || 'cloudEventKafkaTemplateForEos' | CloudEventSerializer | CloudEventDeserializer
+ eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer
+ 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer
+ 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer
}
}
@SpringBean
EventProducer cpsAsyncRequestResponseEventProducer =
- new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+ new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
@SpringBean
class DataOperationEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventProducer asyncDataOperationEventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+ EventProducer asyncDataOperationEventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer)
JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
@SpringBean
- EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+ EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
def 'Process per data operation request with #serviceName.'() {
given: 'data operation request with 3 operations'
class CmAvcEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+ EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
def mockCmAvcEventService = Mock(CmAvcEventService)
def mockInventoryPersistence = Mock(InventoryPersistence)
@SpringBean
- CmAvcEventConsumer objectUnderTest = new CmAvcEventConsumer(eventsProducer, mockCmAvcEventService, mockInventoryPersistence)
+ CmAvcEventConsumer objectUnderTest = new CmAvcEventConsumer(eventProducer, mockCmAvcEventService, mockInventoryPersistence)
@Autowired
JsonObjectMapper jsonObjectMapper
.withSource(URI.create(sourceSystem as String))
.withExtension('correlationid', cmHandleId).build()
}
-}
+}
\ No newline at end of file
class EventProducerSpec extends MessagingBaseSpec {
def legacyEventKafkaConsumer = new KafkaConsumer<String, LegacyEvent>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
+
def testTopic = 'ncmp-events-test'
@SpringBean
- EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+ EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@Autowired
JsonObjectMapper jsonObjectMapper
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2026 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.utils.events
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.slf4j.LoggerFactory
+import spock.lang.Specification
+
+class CloudEventMapperSpec extends Specification {
+
+ def logger = Spy(ListAppender<ILoggingEvent>)
+
+ void setup() {
+ def setupLogger = ((Logger) LoggerFactory.getLogger(CloudEventMapper.class))
+ setupLogger.setLevel(Level.DEBUG)
+ setupLogger.addAppender(logger)
+ logger.start()
+ }
+
+ void cleanup() {
+ ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders()
+ }
+
+ def 'Map cloud event with runtime exception'() {
+ given: 'a cloud event with invalid data'
+ def cloudEvent = CloudEventBuilder.v1()
+ .withId('test-id')
+ .withType('test-type')
+ .withSource(URI.create('test-source'))
+ .withData('invalid-json-data'.bytes)
+ .build()
+ when: 'mapping to target event class'
+ CloudEventMapper.toTargetEvent(cloudEvent, String.class)
+ then: 'error message is logged'
+ def loggingEvents = logger.list
+ assert loggingEvents.size() >= 1
+ def errorEvent = loggingEvents.find { it.level == Level.ERROR }
+ assert errorEvent.formattedMessage.contains('Unable to map cloud event to target event class type')
+ assert errorEvent.formattedMessage.contains('class java.lang.String')
+ }
+}
\ No newline at end of file
def legacyEventKafkaTemplate = new KafkaTemplate<String, LegacyEvent>(new DefaultKafkaProducerFactory<String, LegacyEvent>(eventProducerConfigProperties(JsonSerializer)))
def cloudEventKafkaTemplate = new KafkaTemplate<String, CloudEvent>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
-
- def cloudEventKafkaTemplateForEos = new KafkaTemplate<String, CloudEvent>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
-
@DynamicPropertySource
static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
@Qualifier("cloudEventKafkaTemplate")
private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
- @Qualifier("cloudEventKafkaTemplateForEos")
- private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplateForEos;
-
/**
* Generic CloudEvent sender.
*
public void sendCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
final CompletableFuture<SendResult<String, CloudEvent>> eventFuture =
cloudEventKafkaTemplate.send(topicName, eventKey, event);
- eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false));
+ eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e));
}
/**
handleLegacyEventCallback(topicName, eventFuture);
}
-
/**
* Legacy Event sender with headers in a Map. Schemas that implement LegacyEvent are eligible to use this method.
*
handleLegacyEventCallback(topicName, eventFuture);
}
- /**
- * Generic CloudEvent sender ensuring Exactly Once Semantics behaviour.
- *
- * @param topicName valid topic name
- * @param eventKey message key
- * @param event message payload
- */
- public void sendCloudEventUsingEos(final String topicName, final String eventKey, final CloudEvent event) {
- final CompletableFuture<SendResult<String, CloudEvent>> eventFuture =
- cloudEventKafkaTemplateForEos.send(topicName, eventKey, event);
- eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, true));
- }
private void handleLegacyEventCallback(final String topicName,
final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture) {
- eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e, false));
+ eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e));
}
private Headers convertToKafkaHeaders(final Map<String, Object> headersAsMap) {
return headers;
}
- private static void logOutcome(final String topicName, final SendResult<String, ?> result, final Throwable e,
- final boolean throwKafkaException) {
+ private static void logOutcome(final String topicName, final SendResult<String, ?> result, final Throwable e) {
if (e == null) {
final Object event = result.getProducerRecord().value();
log.debug("Successfully sent event to topic : {} , Event : {}", topicName, event);
} else {
log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage());
- if (throwKafkaException && e instanceof KafkaException) {
- throw (KafkaException) e;
- }
}
}
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.SendResult
-import org.springframework.util.SerializationUtils
import spock.lang.Specification
import java.util.concurrent.CompletableFuture
def mockLegacyKafkaTemplate = Mock(KafkaTemplate)
def mockCloudEventKafkaTemplate = Mock(KafkaTemplate)
- def mockCloudEventKafkaTemplateForEos = Mock(KafkaTemplate)
def logger = Spy(ListAppender<ILoggingEvent>)
void setup() {
((Logger) LoggerFactory.getLogger(EventProducer.class)).detachAndStopAllAppenders()
}
- def objectUnderTest = new EventProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos)
+ def objectUnderTest = new EventProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate)
def 'Send Cloud Event'() {
given: 'a successfully sent event'
assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event')
}
- def 'Send Cloud Event Using EOS'() {
- given: 'a successfull result from send event with EOS semantics'
- def eventFuture = CompletableFuture.completedFuture(
- new SendResult(
- new ProducerRecord('eos-topic', 'some-value'),
- new RecordMetadata(new TopicPartition('eos-topic', 0), 0, 0, 0, 0, 0)
- )
- )
- def someCloudEvent = Mock(CloudEvent)
- 1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFuture
- when: 'sending the cloud event using EOS'
- objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent)
- then: 'the correct debug message is logged'
- assert verifyLoggingEvent(Level.DEBUG, 'Successfully sent event') == true
- }
-
def 'Send Cloud Event with Exception'() {
given: 'a failed event'
def eventFutureWithFailure = new CompletableFuture<SendResult<String, String>>()
assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
}
- def 'Send Cloud Event Using EOS with KafkaException'() {
- given: 'an event fails with KafkaException'
- def kafkaException = new org.springframework.kafka.KafkaException('some kafka exception')
- def eventFutureWithFailure = new CompletableFuture<SendResult<String, CloudEvent>>()
- eventFutureWithFailure.completeExceptionally(kafkaException)
- def someCloudEvent = Mock(CloudEvent)
- 1 * mockCloudEventKafkaTemplateForEos.send('eos-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure
- when: 'sending the cloud event using EOS'
- objectUnderTest.sendCloudEventUsingEos('eos-topic', 'some-event-key', someCloudEvent)
- then: 'the correct error message is logged'
- assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
- }
-
def 'Send Legacy Event'() {
given: 'a successfully sent event'
def eventFuture = CompletableFuture.completedFuture(
def 'Send Legacy Event with Headers as Map'() {
given: 'a successfully sent event'
- def sampleEventHeaders = ['k1': SerializationUtils.serialize('v1')]
+ def sampleEventHeaders = ['k1': 'v1', 'k2': 'v2']
def eventFuture = CompletableFuture.completedFuture(
new SendResult(
new ProducerRecord('some-topic', 'some-value'),
getProducerRecord() >> Mock(ProducerRecord)
}
def runtimeException = new RuntimeException('some runtime exception')
- def logOutcomeMethod = EventProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean)
+ def logOutcomeMethod = EventProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable)
logOutcomeMethod.accessible = true
when: 'logging the outcome with throwKafkaException set to true'
- logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException, true)
+ logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException)
then: 'error message is logged'
assert verifyLoggingEvent(Level.ERROR, 'Unable to send event') == true
}
lastLoggingEvent.level == expectedLevel && lastLoggingEvent.formattedMessage.contains(expectedFormattedMessage)
}
-}
+}
\ No newline at end of file
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,CONNECTIONS_FROM_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
healthcheck:
test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1
interval: 10s
model-loader:
maximum-attempt-count: 20
- notifications:
- avc-event-producer:
- transaction-id-prefix: integration-test-tx-
-
servlet:
multipart:
enabled: true