start_period: 20s
depends_on:
- message-router-kafka
- - consul-server
+ - consul-config
volumes:
- ./ssl/:/etc/ves-hv/
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
- <artifactId>micrometer-registry-jmx</artifactId>
- <version>1.0.5</version>
+ <artifactId>micrometer-registry-prometheus</artifactId>
+ <version>1.0.8</version>
</dependency>
<!-- Test dependencies -->
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class FakeMetrics: Metrics {
+class FakeMetrics : Metrics {
override fun notifyBytesReceived(size: Int) {
}
import io.netty.handler.codec.http.HttpResponseStatus
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
* @since August 2018
*/
class HealthCheckApiServer(private val healthState: HealthState,
+ private val monitoring: PrometheusMetricsProvider,
private val listenAddress: SocketAddress) {
private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING)
.route { routes ->
routes.get("/health/ready", ::readinessHandler)
routes.get("/health/alive", ::livenessHandler)
+ routes.get("/monitoring/prometheus", ::monitoringHandler)
}
NettyServerHandle(ctx.bindNow())
}
private fun livenessHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet"))
+
+ private fun monitoringHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
+ resp.sendString(monitoring.lastStatus())
+
companion object {
private val logger = Logger(HealthCheckApiServer::class)
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.ports
+
+import reactor.core.publisher.Mono
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+interface PrometheusMetricsProvider {
+ fun lastStatus(): Mono<String>
+}
WORKDIR /opt/ves-hv-collector
-ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
+ENTRYPOINT ["entry.sh"]
COPY target/libs/external/* ./
COPY target/libs/internal/* ./
-COPY src/main/scripts/healthcheck.sh ./
+COPY src/main/scripts/*.sh ./
COPY target/hv-collector-main-*.jar ./
-->
<dependency>
<groupId>io.micrometer</groupId>
- <artifactId>micrometer-registry-jmx</artifactId>
+ <artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies>
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.main
-
-import arrow.syntax.function.memoize
-import io.micrometer.core.instrument.Clock
-import io.micrometer.core.instrument.Counter
-import io.micrometer.core.instrument.MeterRegistry
-import io.micrometer.jmx.JmxConfig
-import io.micrometer.jmx.JmxMeterRegistry
-import org.onap.dcae.collectors.veshv.boundary.Metrics
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class MicrometerMetrics(
- private val registry: MeterRegistry = JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM)
-) : Metrics {
-
- private val receivedBytes = registry.counter("data.received.bytes")
- private val receivedMsgCount = registry.counter("messages.received.count")
- private val receivedMsgBytes = registry.counter("messages.received.bytes")
- private val sentCountTotal = registry.counter("messages.sent.count")
-
- init {
- registry.gauge("messages.processing.count", this) {
- (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
- }
- }
-
- private val sentCount = { topic: String ->
- registry.counter("messages.sent.count", "topic", topic)
- }.memoize<String, Counter>()
-
-
- override fun notifyBytesReceived(size: Int) {
- receivedBytes.increment(size.toDouble())
- }
-
- override fun notifyMessageReceived(size: Int) {
- receivedMsgCount.increment()
- receivedMsgBytes.increment(size.toDouble())
- }
-
- override fun notifyMessageSent(topic: String) {
- sentCountTotal.increment()
- sentCount(topic).increment()
- }
-}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.metrics
+
+import arrow.syntax.function.memoize
+import io.micrometer.core.instrument.Counter
+import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics
+import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics
+import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics
+import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics
+import io.micrometer.core.instrument.binder.system.ProcessorMetrics
+import io.micrometer.prometheus.PrometheusConfig
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class MicrometerMetrics internal constructor(
+ private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
+) : Metrics {
+
+ private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES))
+ private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
+ private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
+ private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
+
+ init {
+ registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
+ (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
+ }
+ ClassLoaderMetrics().bindTo(registry)
+ JvmMemoryMetrics().bindTo(registry)
+ JvmGcMetrics().bindTo(registry)
+ ProcessorMetrics().bindTo(registry)
+ JvmThreadMetrics().bindTo(registry)
+ }
+
+ private val sentCount = { topic: String ->
+ registry.counter("hvves.messages.sent.topic.count", "topic", topic)
+ }.memoize<String, Counter>()
+
+ val metricsProvider = MicrometerPrometheusMetricsProvider(registry)
+
+ override fun notifyBytesReceived(size: Int) {
+ receivedBytes.increment(size.toDouble())
+ }
+
+ override fun notifyMessageReceived(size: Int) {
+ receivedMsgCount.increment()
+ receivedMsgBytes.increment(size.toDouble())
+ }
+
+ override fun notifyMessageSent(topic: String) {
+ sentCountTotal.increment()
+ sentCount(topic).increment()
+ }
+
+ companion object {
+ val INSTANCE = MicrometerMetrics()
+ internal const val PREFIX = "hvves"
+ internal const val MESSAGES = "messages"
+ internal const val RECEIVED = "received"
+ internal const val BYTES = "bytes"
+ internal const val COUNT = "count"
+ internal const val DATA = "data"
+ internal const val SENT = "sent"
+ internal const val PROCESSING = "processing"
+ fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
+ }
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.metrics
+
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider
+import reactor.core.publisher.Mono
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+class MicrometerPrometheusMetricsProvider(private val registry: PrometheusMeterRegistry) : PrometheusMetricsProvider {
+ override fun lastStatus(): Mono<String> = Mono.fromCallable {
+ registry.scrape()
+ }
+}
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.ServerHandle
private fun createHealthCheckServer(config: ServerConfiguration) =
HealthCheckApiServer(
HealthState.INSTANCE,
+ MicrometerMetrics.INSTANCE.metricsProvider,
config.healthCheckApiListenAddress)
override fun serverStartedMessage(handle: ServerHandle) =
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
-import org.onap.dcae.collectors.veshv.main.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.ServerHandle
val collectorProvider = CollectorFactory(
AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
sink,
- MicrometerMetrics(),
+ MicrometerMetrics.INSTANCE,
config.maximumPayloadSizeBytes
).createVesHvCollectorProvider()
--- /dev/null
+#!/usr/bin/env bash
+
+set -euo pipefail
+
+java ${JAVA_OPTS:-''} -cp '*:' org.onap.dcae.collectors.veshv.main.MainKt $@
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.search.RequiredSearch
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
+import io.micrometer.prometheus.PrometheusConfig
+import io.micrometer.prometheus.PrometheusMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.data.Percentage
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider
+import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
*/
object MicrometerMetricsTest : Spek({
val doublePrecision = Percentage.withPercentage(0.5)
- lateinit var registry: SimpleMeterRegistry
+ lateinit var registry: PrometheusMeterRegistry
lateinit var cut: MicrometerMetrics
beforeEachTest {
- registry = SimpleMeterRegistry()
+ registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
cut = MicrometerMetrics(registry)
}
fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
registry.meters
.filter { it is Counter }
+ .map { it as Counter }
.filterNot { it.id.name in changedCounters }
- .forEach { assertThat((it as Counter).count()).isCloseTo(0.0, doublePrecision) }
+ .forEach {
+ assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
+ }
}
describe("notifyBytesReceived") {
- on("data.received.bytes counter") {
- val counterName = "data.received.bytes"
+ on("$PREFIX.data.received.bytes counter") {
+ val counterName = "$PREFIX.data.received.bytes"
it("should increment counter") {
val bytes = 128
}
describe("notifyMessageReceived") {
- on("messages.received.count counter") {
- val counterName = "messages.received.count"
+ on("$PREFIX.messages.received.count counter") {
+ val counterName = "$PREFIX.messages.received.count"
it("should increment counter") {
cut.notifyMessageReceived(777)
}
}
- on("messages.received.bytes counter") {
- val counterName = "messages.received.bytes"
+ on("$PREFIX.messages.received.bytes counter") {
+ val counterName = "$PREFIX.messages.received.bytes"
it("should increment counter") {
val bytes = 888
it("should leave all other counters unchanged") {
cut.notifyMessageReceived(128)
- verifyAllCountersAreUnchangedBut("messages.received.count", "messages.received.bytes")
+ verifyAllCountersAreUnchangedBut("$PREFIX.messages.received.count", "$PREFIX.messages.received.bytes")
}
}
describe("notifyMessageSent") {
- val topicName = "dmaap_topic_name"
- val counterName = "messages.sent.count"
+ val topicName1 = "PERF3GPP"
+ val topicName2 = "CALLTRACE"
- on("$counterName counter") {
+ on("$PREFIX.messages.sent.count counter") {
+ val counterName = "$PREFIX.messages.sent.count"
it("should increment counter") {
- cut.notifyMessageSent(topicName)
+ cut.notifyMessageSent(topicName1)
verifyCounter(counterName) { counter ->
assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
}
+ verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count")
}
}
- on("$counterName[topic=$topicName] counter") {
-
- it("should increment counter") {
- cut.notifyMessageSent(topicName)
+ on("$PREFIX.messages.sent.topic.count counter") {
+ val counterName = "$PREFIX.messages.sent.topic.count"
+ it("should handle counters for different topics") {
+ cut.notifyMessageSent(topicName1)
+ cut.notifyMessageSent(topicName2)
+ cut.notifyMessageSent(topicName2)
- verifyCounter(registrySearch().name(counterName).tag("topic", topicName)) { counter ->
+ verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) { counter ->
assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
}
- }
- }
- it("should leave all other counters unchanged") {
- cut.notifyMessageSent(topicName)
- verifyAllCountersAreUnchangedBut(counterName)
+ verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) { counter ->
+ assertThat(counter.count()).isCloseTo(2.0, doublePrecision)
+ }
+ }
}
}