Merge "Created media folders for ResourceDictionary"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / selfservice / api / MessagingConfig.kt
1 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
2
3 import org.apache.kafka.clients.CommonClientConfigs
4 import org.apache.kafka.clients.consumer.ConsumerConfig
5 import org.apache.kafka.common.serialization.StringDeserializer
6 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
7 import org.springframework.beans.factory.annotation.Value
8 import org.springframework.context.annotation.Bean
9 import org.springframework.context.annotation.Configuration
10 import org.springframework.kafka.annotation.EnableKafka
11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
12 import org.springframework.kafka.core.ConsumerFactory
13 import org.springframework.kafka.core.DefaultKafkaConsumerFactory
14 import org.springframework.kafka.support.serializer.JsonDeserializer
15
16 @Configuration
17 open class MessagingConfig {
18
19     @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
20     lateinit var groupId: String
21
22     @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
23     lateinit var bootstrapServers: String
24
25     open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? {
26         val configProperties = hashMapOf<String, Any>()
27         configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
28         configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
29         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
30         configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
31         configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
32
33         return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java))
34     }
35
36     /**
37      *  Creation of a Kafka MessageListener Container
38      *
39      *  @return KafkaListener instance.
40      */
41     @Bean
42     open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
43         val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
44         factory.consumerFactory = consumerFactory()
45         return factory
46     }
47 }