/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2022 Nordix Foundation.
+ * Copyright (c) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.cps.ncmp.api.kafka
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.kafka.core.DefaultKafkaProducerFactory
class MessagingBaseSpec extends Specification {
- static {
- Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
- }
-
def setupSpec() {
kafkaTestContainer.start()
}
+ def cleanupSpec() {
+ kafkaTestContainer.stop()
+ }
+
static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
- def producerConfigProperties() {
+ def legacyEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, String>(eventProducerConfigProperties(JsonSerializer)))
+
+ def cloudEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
+
+ @DynamicPropertySource
+ static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
+ dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+ }
+
+ def eventProducerConfigProperties(valueSerializer) {
return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
('retries') : 0,
('batch-size') : 16384,
('linger.ms') : 1,
('buffer.memory') : 33554432,
('key.serializer') : StringSerializer,
- ('value.serializer') : JsonSerializer]
+ ('value.serializer') : valueSerializer]
}
- def consumerConfigProperties(consumerGroupId) {
+ def eventConsumerConfigProperties(consumerGroupId, valueSerializer) {
return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
('key.deserializer') : StringDeserializer,
- ('value.deserializer'): StringDeserializer,
+ ('value.deserializer'): valueSerializer,
('auto.offset.reset') : 'earliest',
('group.id') : consumerGroupId
]
}
-
- def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
-
- @DynamicPropertySource
- static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
- dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
- }
}