2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
 
  22 import arrow.syntax.function.memoize
 
  23 import io.micrometer.core.instrument.Tag
 
  24 import io.micrometer.core.instrument.Timer
 
  25 import io.micrometer.prometheus.PrometheusConfig
 
  26 import io.micrometer.prometheus.PrometheusMeterRegistry
 
  27 import org.apache.kafka.common.TopicPartition
 
  28 import org.onap.dcae.collectors.veshv.utils.TimeUtils
 
  29 import reactor.core.publisher.Mono
 
  30 import java.time.Duration
 
  31 import java.time.Instant
 
  32 import java.util.concurrent.atomic.AtomicLong
 
  34 internal class MicrometerMetrics constructor(
 
  35         private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
 
  38     private val currentOffsetByTopicPartition = { topic: String ->
 
  39         registry.gauge(name(CONSUMER, OFFSET, TOPIC), listOf(Tag.of(TOPIC, topic)), AtomicLong(0))
 
  40     }.memoize<String, AtomicLong>()
 
  42     private val travelTime = Timer.builder(name(TRAVEL,TIME))
 
  43             .publishPercentileHistogram(true)
 
  46     fun lastStatus(): Mono<String> = Mono.fromCallable {
 
  50     override fun notifyOffsetChanged(offset: Long, topicPartition: TopicPartition) {
 
  51         currentOffsetByTopicPartition(topicPartition.toString()).set(offset)
 
  54     override fun notifyMessageTravelTime(messageSentTimeMicros: Long) {
 
  55         travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now()))
 
  60         val INSTANCE by lazy { MicrometerMetrics() }
 
  61         private const val CONSUMER = "consumer"
 
  62         private const val OFFSET = "offset"
 
  63         private const val TOPIC = "topic"
 
  64         private const val TRAVEL = "travel"
 
  65         private const val TIME = "time"
 
  66         private const val PREFIX = "hv-kafka-consumer"
 
  67         private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"