dd24345dbe8ce2751e7d34aad6b6397b956d06ca
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
21
22 import kotlinx.coroutines.CoroutineDispatcher
23 import kotlinx.coroutines.Dispatchers
24 import kotlinx.coroutines.GlobalScope
25 import kotlinx.coroutines.Job
26 import kotlinx.coroutines.delay
27 import kotlinx.coroutines.isActive
28 import kotlinx.coroutines.launch
29 import org.apache.kafka.clients.consumer.KafkaConsumer
30 import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
31
32 internal class KafkaSource(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
33                            private val topics: Set<String>,
34                            private val dispatcher: CoroutineDispatcher = Dispatchers.IO) {
35     suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job =
36             GlobalScope.launch(dispatcher) {
37                 kafkaConsumer.subscribe(topics)
38                 val topicPartitions = kafkaConsumer.assignment()
39                 while (isActive) {
40                     kafkaConsumer.endOffsets(topicPartitions)
41                             .forEach { (topicPartition, offset) ->
42                                 offsetConsumer.update(topicPartition, offset)
43                             }
44                     kafkaConsumer.commitSync()
45                     delay(updateInterval)
46                 }
47             }
48 }