2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
22 import io.micrometer.core.instrument.Timer
23 import io.micrometer.prometheus.PrometheusConfig
24 import io.micrometer.prometheus.PrometheusMeterRegistry
25 import org.onap.dcae.collectors.veshv.utils.TimeUtils
26 import reactor.core.publisher.Mono
27 import java.time.Duration
28 import java.time.Instant
29 import java.util.concurrent.atomic.AtomicLong
31 internal class MicrometerMetrics constructor(
32 private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
35 private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0))
36 private val travelTime = Timer.builder(name("travel.time"))
37 .publishPercentileHistogram(true)
40 fun lastStatus(): Mono<String> = Mono.fromCallable {
44 override fun notifyOffsetChanged(offset: Long) {
45 currentOffset.lazySet(offset)
48 override fun notifyMessageTravelTime(messageSentTimeMicros: Long) {
49 travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now()))
53 val INSTANCE by lazy { MicrometerMetrics() }
55 private const val PREFIX = "hv-kafka-consumer"
56 private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"