a0932e91608e08f8d42309e275f87c9d6cf1b972
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.runBlocking
23 import org.apache.kafka.clients.consumer.Consumer
24 import org.apache.kafka.clients.consumer.ConsumerRecord
25 import org.apache.kafka.clients.consumer.KafkaConsumer
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
27 import org.onap.ccsdk.cds.controllerblueprints.core.logger
28 import java.time.Duration
29 import kotlin.concurrent.thread
30
31 open class KafkaMessageConsumerService(
32     private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
33 ) :
34     BlueprintMessageConsumerService {
35
36     val log = logger(KafkaMessageConsumerService::class)
37     val channel = Channel<ConsumerRecord<String, ByteArray>>()
38     var kafkaConsumer: Consumer<String, ByteArray>? = null
39
40     @Volatile
41     var keepGoing = true
42
43     fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
44         val configProperties = messageConsumerProperties.getConfig()
45         /** add or override already set properties */
46         additionalConfig?.let { configProperties.putAll(it) }
47         /** Create Kafka consumer */
48         return KafkaConsumer(configProperties)
49     }
50
51     override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
52         /** get to topic names */
53         val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
54         check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
55         return subscribe(consumerTopic, additionalConfig)
56     }
57
58     override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
59         /** Create Kafka consumer */
60         kafkaConsumer = kafkaConsumer(additionalConfig)
61
62         checkNotNull(kafkaConsumer) {
63             "failed to create kafka consumer for " +
64                     "server(${messageConsumerProperties.bootstrapServers})'s " +
65                     "topics(${messageConsumerProperties.bootstrapServers})"
66         }
67
68         kafkaConsumer!!.subscribe(topics)
69         log.info("Successfully consumed topic($topics)")
70
71         thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
72             keepGoing = true
73             kafkaConsumer!!.use { kc ->
74                 while (keepGoing) {
75                     val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
76                     log.trace("Consumed Records : ${consumerRecords.count()}")
77                     runBlocking {
78                         consumerRecords?.forEach { consumerRecord ->
79                             /** execute the command block */
80                             if (!channel.isClosedForSend) {
81                                 channel.send(consumerRecord)
82                             } else {
83                                 log.error("Channel is closed to receive message")
84                             }
85                         }
86                     }
87                 }
88                 log.info("message listener shutting down.....")
89             }
90         }
91         return channel
92     }
93
94     override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
95         /** get to topic names */
96         val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
97         check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
98         return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
99     }
100
101     override suspend fun consume(
102         topics: List<String>,
103         additionalConfig: Map<String, Any>?,
104         consumerFunction: ConsumerFunction
105     ) {
106
107         val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
108
109         /** Create Kafka consumer */
110         kafkaConsumer = kafkaConsumer(additionalConfig)
111
112         checkNotNull(kafkaConsumer) {
113             "failed to create kafka consumer for " +
114                     "server(${messageConsumerProperties.bootstrapServers})'s " +
115                     "topics(${messageConsumerProperties.bootstrapServers})"
116         }
117
118         kafkaConsumer!!.subscribe(topics)
119         log.info("Successfully consumed topic($topics)")
120
121         thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
122             keepGoing = true
123             kafkaConsumer!!.use { kc ->
124                 while (keepGoing) {
125                     val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
126                     log.trace("Consumed Records : ${consumerRecords.count()}")
127                     runBlocking {
128                         /** Execute dynamic consumer Block substitution */
129                         kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords)
130                     }
131                 }
132                 log.info("message listener shutting down.....")
133             }
134         }
135     }
136
137     override suspend fun shutDown() {
138         /** stop the polling loop */
139         keepGoing = false
140         /** Close the Channel */
141         channel.cancel()
142         /** TO shutdown gracefully, need to wait for the maximum poll time */
143         delay(messageConsumerProperties.pollMillSec)
144     }
145 }