1 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
3 import org.apache.kafka.clients.CommonClientConfigs
4 import org.apache.kafka.clients.consumer.ConsumerConfig
5 import org.apache.kafka.common.serialization.StringDeserializer
6 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
7 import org.springframework.beans.factory.annotation.Value
8 import org.springframework.context.annotation.Bean
9 import org.springframework.context.annotation.Configuration
10 import org.springframework.kafka.annotation.EnableKafka
11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
12 import org.springframework.kafka.core.ConsumerFactory
13 import org.springframework.kafka.core.DefaultKafkaConsumerFactory
14 import org.springframework.kafka.support.serializer.JsonDeserializer
17 open class MessagingConfig {
19 @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
20 lateinit var groupId: String
22 @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
23 lateinit var bootstrapServers: String
25 open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? {
26 val configProperties = hashMapOf<String, Any>()
27 configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
28 configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
29 configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
30 configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
31 configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
33 return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java))
37 * Creation of a Kafka MessageListener Container
39 * @return KafkaListener instance.
42 open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
43 val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
44 factory.consumerFactory = consumerFactory()