1 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
3 import com.fasterxml.jackson.databind.DeserializationFeature
4 import com.fasterxml.jackson.databind.ObjectMapper
5 import org.apache.kafka.clients.CommonClientConfigs
6 import org.apache.kafka.clients.consumer.ConsumerConfig
7 import org.apache.kafka.common.serialization.StringDeserializer
8 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
9 import org.springframework.beans.factory.annotation.Value
10 import org.springframework.context.annotation.Bean
11 import org.springframework.context.annotation.Configuration
12 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
13 import org.springframework.kafka.core.ConsumerFactory
14 import org.springframework.kafka.core.DefaultKafkaConsumerFactory
15 import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
16 import org.springframework.kafka.support.serializer.JsonDeserializer
19 open class MessagingConfig {
21 @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
22 lateinit var groupId: String
24 @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
25 lateinit var bootstrapServers: String
27 open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? {
28 val configProperties = hashMapOf<String, Any>()
29 configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
30 configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
31 configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
32 configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
33 configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java
34 configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name
36 val deserializer = JsonDeserializer<ExecutionServiceInput>()
37 deserializer.setRemoveTypeHeaders(true)
38 deserializer.addTrustedPackages("*")
40 val jsonDeserializer = JsonDeserializer(ExecutionServiceInput::class.java,
41 ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))
43 return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
44 ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer))
48 * Creation of a Kafka MessageListener Container
50 * @return KafkaListener instance.
53 open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
54 val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
55 factory.consumerFactory = consumerFactory()