66169534094afb73bdc7c81ec2f63cd4525ebc91
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
21
22 import arrow.core.Option
23 import arrow.effects.IO
24 import org.onap.dcae.collectors.veshv.utils.logging.Logger
25 import reactor.kafka.receiver.ReceiverRecord
26
27 /**
28  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
29  * @since June 2018
30  */
31
32 class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>)
33
34 interface ConsumerStateProvider {
35     fun currentState(): ConsumerState
36     fun reset(): IO<Unit>
37 }
38
39 class Consumer : ConsumerStateProvider {
40     private var msgCount = 0L
41     private var lastKey: ByteArray? = null
42     private var lastValue: ByteArray? = null
43
44     override fun currentState() =
45             ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue))
46
47     override fun reset() = IO {
48         synchronized(this) {
49             msgCount = 0
50             lastKey = null
51             lastValue = null
52         }
53     }
54
55     fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
56         logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
57
58         synchronized(this) {
59             msgCount++
60             lastKey = record.key()
61             lastValue = record.value()
62         }
63     }
64
65     companion object {
66         private val logger = Logger(Consumer::class)
67     }
68 }