HvVesCustomRules:
active: true
SuboptimalLoggerUsage:
- active: true
+ active: false
command: ["--listen-port", "6062",
"--ves-host", "ves-hv-collector",
"--ves-port", "6061",
+ "--key-store", "/etc/ves-hv/client.p12",
"--key-store-password", "onaponap",
"--trust-store-password", "onaponap"]
depends_on:
</modules>
<properties>
- <kotlin.version>1.3.0</kotlin.version>
+ <kotlin.version>1.3.11</kotlin.version>
<arrow.version>0.8.0</arrow.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
.decode(rawPayload)
- .filterFailedWithLog(logger, clientContext::asMap,
+ .filterFailedWithLog(logger, clientContext::fullMdc,
{ "Ves event header decoded successfully" },
{ "Failed to decode ves event header, reason: ${it.message}" })
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
- .filterEmptyWithLog(logger, clientContext::asMap,
+ .filterEmptyWithLog(logger, clientContext::fullMdc,
{ "Found route for message: ${it.topic}, partition: ${it.partition}" },
{ "Could not find route for message" })
.also { logger.debug { "Released buffer memory after handling message stream" } }
fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
- filterFailedWithLog(logger, clientContext::asMap, predicate)
+ filterFailedWithLog(logger, clientContext::fullMdc, predicate)
companion object {
private val logger = Logger(VesHvCollector::class)
@Suppress("TooManyFunctions")
internal object ClientContextLogging {
- fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
- fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
- fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
- fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
- fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
+ fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block)
+ fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block)
+ fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block)
+ fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::fullMdc, block)
+ fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::fullMdc, block)
- fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
- fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
- fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
- fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
- fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
+ fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::fullMdc, message)
+ fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::fullMdc, message)
+ fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::fullMdc, message)
+ fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::fullMdc, message)
+ fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::fullMdc, message)
fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable,
returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux)
+ return this.handleReactiveStreamError({ context.fullMdc }, ex, returnFlux)
}
}
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
import reactor.retry.Retry
import java.io.StringReader
import java.time.Duration
+import java.util.*
import java.util.concurrent.atomic.AtomicReference
import javax.json.Json
import javax.json.JsonObject
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
private val retry = retrySpec
.doOnRetry {
- logger.withWarn { log("Could not get fresh configuration", it.exception()) }
+ logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) }
healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
.map(::createCollectorConfiguration)
.retryWhen(retry)
- private fun askForConfig(): Mono<String> = http.get(url)
+ private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
+ val invocationId = UUID.randomUUID()
+ http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
+ }
- private fun filterDifferentValues(configurationString: String) =
- hashOf(configurationString).let {
- if (it == lastConfigurationHash.get()) {
- logger.trace { "No change detected in consul configuration" }
- Mono.empty()
- } else {
- logger.info { "Obtained new configuration from consul:\n${configurationString}" }
- lastConfigurationHash.set(it)
- Mono.just(configurationString)
+ private fun filterDifferentValues(configuration: BodyWithInvocationId) =
+ configuration.body.let { configurationString ->
+ hashOf(configurationString).let {
+ if (it == lastConfigurationHash.get()) {
+ logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+ "No change detected in consul configuration"
+ }
+ Mono.empty()
+ } else {
+ logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+ "Obtained new configuration from consul:\n${configurationString}"
+ }
+ lastConfigurationHash.set(it)
+ Mono.just(configurationString)
+ }
}
}
private const val BACKOFF_INTERVAL_FACTOR = 30L
private val logger = Logger(ConsulConfigurationProvider::class)
}
-}
+ private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
+}
import io.netty.handler.codec.http.HttpStatusClass
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
+import java.util.*
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
*/
open class HttpAdapter(private val httpClient: HttpClient) {
- open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
- .get()
- .uri(url + createQueryString(queryParams))
- .responseSingle { response, content ->
- if (response.status().codeClass() == HttpStatusClass.SUCCESS)
- content.asString()
- else {
- val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
- Mono.error(IllegalStateException(errorMessage))
- }
- }
- .doOnError {
- logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
- logger.withDebug { log("Nested exception:", it) }
- }
+ open fun get(url: String, invocationId: UUID, queryParams: Map<String, Any> = emptyMap()): Mono<String> =
+ httpClient
+ .headers { it[INVOCATION_ID_HEADER] = invocationId.toString() }
+ .get()
+ .uri(url + createQueryString(queryParams))
+ .responseSingle { response, content ->
+ if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+ content.asString()
+ else {
+ val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
+ Mono.error(IllegalStateException(errorMessage))
+ }
+ }
+ .doOnError {
+ logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
+ logger.withDebug { log("Nested exception:", it) }
+ }
private fun createQueryString(params: Map<String, Any>): String {
if (params.isEmpty())
}
companion object {
-
-
private val logger = Logger(HttpAdapter::class)
+ const val INVOCATION_ID_HEADER = "X-${OnapMdc.INVOCATION_ID}"
}
}
}
private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace(ctx::asMap, Marker.INVOKE) {
+ logger.trace(ctx::fullMdc, Marker.Invoke()) {
val msgNum = sentMessages.incrementAndGet()
"Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
}
*/
package org.onap.dcae.collectors.veshv.impl.socket
+import arrow.core.None
+import arrow.core.Option
import arrow.core.getOrElse
+import arrow.core.toOption
import arrow.effects.IO
+import arrow.syntax.collections.firstOption
+import io.netty.handler.ssl.SslHandler
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.model.ClientContext
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
import java.time.Duration
+import java.lang.Exception
+import java.security.cert.X509Certificate
+import javax.net.ssl.SSLSession
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
val clientContext = ClientContext(nettyOutbound.alloc())
nettyInbound.withConnection {
- clientContext.clientAddress = it.address()
+ populateClientContext(clientContext, it)
+ it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
+ sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+ }
+
}
- logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" }
+ logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
return collectorProvider(clientContext).fold(
{
- logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+ logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
Mono.empty()
},
{
- logger.info(clientContext::asMap) { "Handling new connection" }
+ logger.info(clientContext::fullMdc) { "Handling new connection" }
nettyInbound.withConnection { conn ->
conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
.logConnectionClosed(clientContext)
)
}
+ private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
+ clientContext.clientAddress = try {
+ Option.fromNullable(connection.address().address)
+ } catch (ex: Exception) {
+ None
+ }
+ clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
+ }
+
+ private fun getSslSession(connection: Connection) = Option.fromNullable(
+ connection
+ .channel()
+ .pipeline()
+ .get(SslHandler::class.java)
+ ?.engine()
+ ?.session)
+
+ private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
+ sslSession
+ .peerCertificates
+ .firstOption()
+ .flatMap { Option.fromNullable(it as? X509Certificate) }
+
private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
- logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." }
+ logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
if (it.isSuccess)
logger.debug(ctx) { "Channel closed successfully." }
else
private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
onTerminate().subscribe {
// TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
- logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" }
+ logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
}
return this
}
*/
package org.onap.dcae.collectors.veshv.model
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.getOrElse
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.net.InetSocketAddress
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.InetAddress
+import java.security.cert.X509Certificate
import java.util.*
/**
*/
data class ClientContext(
val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT,
- val clientId: String = UUID.randomUUID().toString(),
- var clientAddress: InetSocketAddress? = null) {
- fun asMap(): Map<String, String> {
- val result = mutableMapOf("clientId" to clientId)
- if (clientAddress != null) {
- result["clientAddress"] = clientAddress.toString()
- }
- return result
+ var clientAddress: Option<InetAddress> = None,
+ var clientCert: Option<X509Certificate> = None,
+ val requestId: String = UUID.randomUUID().toString(), // Should be somehow propagated to DMAAP
+ val invocationId: String = UUID.randomUUID().toString()) {
+
+ val mdc: Map<String, String>
+ get() = mapOf(
+ OnapMdc.REQUEST_ID to requestId,
+ OnapMdc.INVOCATION_ID to invocationId,
+ OnapMdc.STATUS_CODE to DEFAULT_STATUS_CODE,
+ OnapMdc.CLIENT_NAME to clientDn().getOrElse { DEFAULT_VALUE },
+ OnapMdc.CLIENT_IP to clientIp().getOrElse { DEFAULT_VALUE }
+ )
+
+ val fullMdc: Map<String, String>
+ get() = mdc + ServiceContext.mdc
+
+ private fun clientDn(): Option<String> = clientCert.map { it.subjectX500Principal.toString() }
+ private fun clientIp(): Option<String> = clientAddress.map(InetAddress::getHostAddress)
+
+ companion object {
+ const val DEFAULT_STATUS_CODE = "INPROGRESS"
+ const val DEFAULT_VALUE = ""
}
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.InetAddress
+import java.net.UnknownHostException
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+object ServiceContext {
+ val instanceId = UUID.randomUUID().toString()
+ val serverFqdn = getHost().hostName
+
+ val mdc = mapOf(
+ OnapMdc.INSTANCE_ID to instanceId,
+ OnapMdc.SERVER_FQDN to serverFqdn
+ )
+
+ private fun getHost() = try {
+ InetAddress.getLocalHost()
+ } catch (ex: UnknownHostException) {
+ InetAddress.getLoopbackAddress()
+ }
+}
*/
package org.onap.dcae.collectors.veshv.impl
-import arrow.core.Option
import arrow.core.Try
-import arrow.core.success
import com.google.protobuf.ByteString
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import reactor.test.test
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
*/
package org.onap.dcae.collectors.veshv.impl.adapters
+import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
on("call to consul") {
- whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
+ whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap()))
.thenReturn(Mono.just(constructConsulResponse()))
it("should use received configuration") {
)
on("call to consul") {
- whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
+ whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap()))
.thenReturn(Mono.error(RuntimeException("Test exception")))
it("should interrupt the flux") {
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.impl.adapters.HttpAdapter.Companion.INVOCATION_ID_HEADER
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.http.server.HttpServer
import reactor.test.StepVerifier
import reactor.test.test
+import java.util.*
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
routes.get("/url") { req, resp ->
resp.sendString(Mono.just(req.uri()))
}
+ routes.get("/inv-id") { req, resp ->
+ resp.sendString(Mono.just(req.requestHeaders()[INVOCATION_ID_HEADER]))
+ }
}
.bindNow()
val baseUrl = "http://${httpServer.host()}:${httpServer.port()}"
given("url without query params") {
val url = "/url"
+ val invocationId = UUID.randomUUID()
it("should not append query string") {
- httpAdapter.get(url).test()
+ httpAdapter.get(url, invocationId).test()
.expectNext(url)
.verifyComplete()
}
+
+ it("should pass invocation id") {
+ httpAdapter.get("/inv-id", invocationId).test()
+ .expectNext(invocationId.toString())
+ .verifyComplete()
+ }
}
given("url with query params") {
val queryParams = mapOf(Pair("p", "the-value"))
val url = "/url"
+ val invocationId = UUID.randomUUID()
it("should add them as query string to the url") {
- httpAdapter.get(url, queryParams).test()
+ httpAdapter.get(url, invocationId, queryParams).test()
.expectNext("/url?p=the-value")
.verifyComplete()
}
+
+ it("should pass invocation id") {
+ httpAdapter.get("/inv-id", invocationId, queryParams).test()
+ .expectNext(invocationId.toString())
+ .verifyComplete()
+ }
}
given("invalid url") {
val invalidUrl = "/wtf"
+ val invocationId = UUID.randomUUID()
it("should interrupt the flux") {
StepVerifier
- .create(httpAdapter.get(invalidUrl))
+ .create(httpAdapter.get(invalidUrl, invocationId))
.verifyError()
}
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import arrow.core.Some
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.net.Inet4Address
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.security.cert.X509Certificate
+import java.util.*
+import javax.security.auth.x500.X500Principal
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object ClientContextTest : Spek({
+ describe("ClientContext") {
+ given("default instance") {
+ val cut = ClientContext()
+
+ on("mapped diagnostic context") {
+ val mdc = cut.mdc
+
+ it("should contain ${OnapMdc.REQUEST_ID}") {
+ assertThat(mdc[OnapMdc.REQUEST_ID]).isEqualTo(cut.requestId)
+ }
+
+ it("should contain ${OnapMdc.INVOCATION_ID}") {
+ assertThat(mdc[OnapMdc.INVOCATION_ID]).isEqualTo(cut.invocationId)
+ }
+
+ it("should contain ${OnapMdc.STATUS_CODE}") {
+ assertThat(mdc[OnapMdc.STATUS_CODE]).isEqualTo("INPROGRESS")
+ }
+
+ it("should contain ${OnapMdc.CLIENT_NAME}") {
+ assertThat(mdc[OnapMdc.CLIENT_NAME]).isBlank()
+ }
+
+ it("should contain ${OnapMdc.CLIENT_IP}") {
+ assertThat(mdc[OnapMdc.CLIENT_IP]).isBlank()
+ }
+ }
+ }
+
+ given("instance with client data") {
+ val clientDn = "C=PL, O=Nokia, CN=NokiaBTS"
+ val clientIp = "192.168.52.34"
+ val cert: X509Certificate = mock()
+ val principal: X500Principal = mock()
+ val cut = ClientContext(
+ clientAddress = Some(InetAddress.getByName(clientIp)),
+ clientCert = Some(cert))
+
+ whenever(cert.subjectX500Principal).thenReturn(principal)
+ whenever(principal.toString()).thenReturn(clientDn)
+
+ on("mapped diagnostic context") {
+ val mdc = cut.mdc
+
+ it("should contain ${OnapMdc.CLIENT_NAME}") {
+ assertThat(mdc[OnapMdc.CLIENT_NAME]).isEqualTo(clientDn)
+ }
+
+ it("should contain ${OnapMdc.CLIENT_IP}") {
+ assertThat(mdc[OnapMdc.CLIENT_IP]).isEqualTo(clientIp)
+ }
+ }
+ }
+ }
+})
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object ServiceContextTest : Spek({
+ describe("ServiceContext") {
+ given("singleton instance") {
+ val cut = ServiceContext
+
+ on("instanceId") {
+ val instanceId = cut.instanceId
+ it("should be valid UUID") {
+ UUID.fromString(instanceId) // should not throw
+ }
+ }
+
+ on("serverFqdn") {
+ val serverFqdn = cut.serverFqdn
+ it("should be non empty") {
+ assertThat(serverFqdn).isNotBlank()
+ }
+ }
+
+ on("mapped diagnostic context") {
+ val mdc = cut.mdc
+
+ it("should contain ${OnapMdc.INSTANCE_ID}") {
+ assertThat(mdc[OnapMdc.INSTANCE_ID]).isEqualTo(cut.instanceId)
+ }
+
+ it("should contain ${OnapMdc.SERVER_FQDN}") {
+ assertThat(mdc[OnapMdc.SERVER_FQDN]).isEqualTo(cut.serverFqdn)
+ }
+ }
+ }
+ }
+})
abstract fun log(message: String)
abstract fun log(message: String, t: Throwable)
abstract fun log(marker: Marker, message: String)
+
open val enabled: Boolean
get() = true
}
}
}
+
+ protected fun withAdditionalMdc(mdc: Map<String, String>, block: () -> Unit) {
+ if (mdc.isEmpty()) {
+ block()
+ } else {
+ try {
+ mdc.forEach(MDC::put)
+ block()
+ } finally {
+ mdc.keys.forEach(MDC::remove)
+ }
+ }
+ }
}
object OffLevelLogger : AtLevelLogger() {
logger.error(message, t)
}
- override fun log(marker: Marker, message: String) {
- logger.error(marker(), message)
- }
+ override fun log(marker: Marker, message: String) =
+ withAdditionalMdc(marker.mdc) {
+ logger.error(marker.slf4jMarker, message)
+ }
}
@Suppress("SuboptimalLoggerUsage")
logger.warn(message, t)
}
- override fun log(marker: Marker, message: String) {
- logger.warn(marker(), message)
- }
+ override fun log(marker: Marker, message: String) =
+ withAdditionalMdc(marker.mdc) {
+ logger.warn(marker.slf4jMarker, message)
+ }
}
@Suppress("SuboptimalLoggerUsage")
logger.info(message, t)
}
- override fun log(marker: Marker, message: String) {
- logger.info(marker(), message)
- }
+ override fun log(marker: Marker, message: String) =
+ withAdditionalMdc(marker.mdc) {
+ logger.info(marker.slf4jMarker, message)
+ }
}
@Suppress("SuboptimalLoggerUsage")
logger.debug(message, t)
}
- override fun log(marker: Marker, message: String) {
- logger.debug(marker(), message)
- }
+ override fun log(marker: Marker, message: String) =
+ withAdditionalMdc(marker.mdc) {
+ logger.debug(marker.slf4jMarker, message)
+ }
}
@Suppress("SuboptimalLoggerUsage")
logger.trace(message, t)
}
- override fun log(marker: Marker, message: String) {
- logger.trace(marker(), message)
- }
+ override fun log(marker: Marker, message: String) =
+ withAdditionalMdc(marker.mdc) {
+ logger.trace(marker.slf4jMarker, message)
+ }
}
package org.onap.dcae.collectors.veshv.utils.logging
import org.slf4j.MarkerFactory
+import java.time.Instant
+import java.util.*
-enum class Marker(private val marker: org.slf4j.Marker) {
- ENTRY(MarkerFactory.getMarker("ENTRY")),
- EXIT(MarkerFactory.getMarker("EXIT")),
- INVOKE(MarkerFactory.getMarker("INVOKE"));
+sealed class Marker(internal val slf4jMarker: org.slf4j.Marker, val mdc: Map<String, String> = emptyMap()) {
- operator fun invoke() = marker
+ object Entry : Marker(ENTRY)
+ object Exit : Marker(EXIT)
+
+ class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : Marker(INVOKE, mdc(id, timestamp)) {
+ companion object {
+ private fun mdc(id: UUID, timestamp: Instant) = mapOf(
+ OnapMdc.INVOCATION_ID to id.toString(),
+ OnapMdc.INVOCATION_TIMESTAMP to timestamp.toString()
+ )
+ }
+ }
+
+ companion object {
+ private val ENTRY = MarkerFactory.getMarker("ENTRY")
+ private val EXIT = MarkerFactory.getMarker("EXIT")
+ private val INVOKE = MarkerFactory.getMarker("INVOKE")
+ }
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+object OnapMdc {
+ const val REQUEST_ID = "RequestID"
+ const val CLIENT_NAME = "PartnerName"
+ const val CLIENT_IP = "ClientIPAddress"
+ const val INVOCATION_ID = "InvocationID"
+ const val INVOCATION_TIMESTAMP = "InvokeTimestamp"
+ const val STATUS_CODE = "StatusCode"
+ const val INSTANCE_ID = "InstanceID"
+ const val SERVER_FQDN = "ServerFQDN"
+}