17e157d15c6ac576317dc6715163513b46bfced3
[ccsdk/cds.git] /
1 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
2
3 import com.fasterxml.jackson.databind.DeserializationFeature
4 import com.fasterxml.jackson.databind.ObjectMapper
5 import org.apache.kafka.clients.CommonClientConfigs
6 import org.apache.kafka.clients.consumer.ConsumerConfig
7 import org.apache.kafka.common.serialization.StringDeserializer
8 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
9 import org.springframework.beans.factory.annotation.Value
10 import org.springframework.context.annotation.Bean
11 import org.springframework.context.annotation.Configuration
12 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
13 import org.springframework.kafka.core.ConsumerFactory
14 import org.springframework.kafka.core.DefaultKafkaConsumerFactory
15 import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
16 import org.springframework.kafka.support.serializer.JsonDeserializer
17
18 @Configuration
19 open class MessagingConfig {
20
21     @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
22     lateinit var groupId: String
23
24     @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
25     lateinit var bootstrapServers: String
26
27     open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? {
28         val configProperties = hashMapOf<String, Any>()
29         configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
30         configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
31         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
32         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
33         configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java
34         configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name
35
36         val deserializer = JsonDeserializer<ExecutionServiceInput>()
37         deserializer.setRemoveTypeHeaders(true)
38         deserializer.addTrustedPackages("*")
39
40         val jsonDeserializer =  JsonDeserializer(ExecutionServiceInput::class.java,
41                 ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))
42
43         return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
44                 ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer))
45     }
46
47     /**
48      *  Creation of a Kafka MessageListener Container
49      *
50      *  @return KafkaListener instance.
51      */
52     @Bean
53     open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
54         val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
55         factory.consumerFactory = consumerFactory()
56         return factory
57     }
58 }