906fce23680169eb195232a9f2c9c2d7e0f62d0c
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-kafka-consumer / src / main / kotlin / org / onap / dcae / collectors / veshv / kafkaconsumer / metrics / MicrometerMetrics.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
21
22 import arrow.syntax.function.memoize
23 import io.micrometer.core.instrument.Tag
24 import io.micrometer.core.instrument.Timer
25 import io.micrometer.prometheus.PrometheusConfig
26 import io.micrometer.prometheus.PrometheusMeterRegistry
27 import org.apache.kafka.common.TopicPartition
28 import org.onap.dcae.collectors.veshv.utils.TimeUtils
29 import reactor.core.publisher.Mono
30 import java.time.Duration
31 import java.time.Instant
32 import java.util.concurrent.atomic.AtomicLong
33
34 internal class MicrometerMetrics constructor(
35         private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
36 ) : Metrics {
37
38     private val currentOffsetByTopicPartition = { topicPartition: String ->
39         registry.gauge(name(OFFSET, PARTITION),
40                 listOf(Tag.of(PARTITION, topicPartition)),
41                 AtomicLong(0))
42     }.memoize<String, AtomicLong>()
43
44     private val travelTime = Timer.builder(name(TRAVEL,TIME))
45             .publishPercentileHistogram(true)
46             .register(registry)
47
48     fun lastStatus(): Mono<String> = Mono.fromCallable {
49         registry.scrape()
50     }
51
52     override fun notifyOffsetChanged(offset: Long, topicPartition: TopicPartition) {
53         currentOffsetByTopicPartition(topicPartition.toString()).set(offset)
54     }
55
56     override fun notifyMessageTravelTime(messageSentTimeMicros: Long) {
57         travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now()))
58     }
59
60     companion object {
61
62         val INSTANCE by lazy { MicrometerMetrics() }
63         private const val OFFSET = "offset"
64         private const val PARTITION = "partition"
65         private const val TRAVEL = "travel"
66         private const val TIME = "time"
67         private const val PREFIX = "hv-kafka-consumer"
68         private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
69     }
70 }