import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
+ private val ctx: ClientContext) : Sink {
private val sentMessages = AtomicLong(0)
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
}
private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace(ctx) {
+ logger.trace(ctx::asMap, Marker.INVOKE) {
val msgNum = sentMessages.incrementAndGet()
"Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
}
import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
clientContext.clientAddress = it.address()
}
+ logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" }
return collectorProvider(clientContext).fold(
{
logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
Mono.empty()
},
{
- logger.info { "Handling new connection" }
+ logger.info(clientContext::asMap) { "Handling new connection" }
nettyInbound.withConnection { conn ->
conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
.logConnectionClosed(clientContext)
private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
+ logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." }
if (it.isSuccess)
logger.debug(ctx) { "Channel closed successfully." }
else
private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
onTerminate().subscribe {
- logger.info(ctx) { "Connection has been closed" }
+ // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
+ logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" }
}
return this
}
fun error(mdc: MappedDiagnosticContext, message: () -> String) =
errorLogger.withMdc(mdc) { log(message()) }
+ fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ errorLogger.withMdc(mdc) { log(marker, message()) }
+
// WARN
fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block()
fun warn(mdc: MappedDiagnosticContext, message: () -> String) =
warnLogger.withMdc(mdc) { log(message()) }
+ fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ warnLogger.withMdc(mdc) { log(marker, message()) }
// INFO
fun info(mdc: MappedDiagnosticContext, message: () -> String) =
infoLogger.withMdc(mdc) { log(message()) }
+ fun info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ infoLogger.withMdc(mdc) { log(marker, message()) }
+
// DEBUG
fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block()
fun debug(mdc: MappedDiagnosticContext, message: () -> String) =
debugLogger.withMdc(mdc) { log(message()) }
+ fun debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ debugLogger.withMdc(mdc) { log(marker, message()) }
// TRACE
fun trace(mdc: MappedDiagnosticContext, message: () -> String) =
traceLogger.withMdc(mdc) { log(message()) }
+ fun trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ traceLogger.withMdc(mdc) { log(marker, message()) }
+
}
abstract class AtLevelLogger {
abstract fun log(message: String)
abstract fun log(message: String, t: Throwable)
+ abstract fun log(marker: Marker, message: String)
open val enabled: Boolean
get() = true
override fun log(message: String, t: Throwable) {
// do not log anything
}
+
+ override fun log(marker: Marker, message: String) {
+ // do not log anything
+ }
}
+@Suppress("SuboptimalLoggerUsage")
class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String) {
logger.error(message)
override fun log(message: String, t: Throwable) {
logger.error(message, t)
}
+
+ override fun log(marker: Marker, message: String) {
+ logger.error(marker(), message)
+ }
}
+@Suppress("SuboptimalLoggerUsage")
class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String) {
logger.warn(message)
override fun log(message: String, t: Throwable) {
logger.warn(message, t)
}
+
+ override fun log(marker: Marker, message: String) {
+ logger.warn(marker(), message)
+ }
}
+@Suppress("SuboptimalLoggerUsage")
class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String) {
logger.info(message)
override fun log(message: String, t: Throwable) {
logger.info(message, t)
}
+
+ override fun log(marker: Marker, message: String) {
+ logger.info(marker(), message)
+ }
}
+@Suppress("SuboptimalLoggerUsage")
class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String) {
logger.debug(message)
override fun log(message: String, t: Throwable) {
logger.debug(message, t)
}
+
+ override fun log(marker: Marker, message: String) {
+ logger.debug(marker(), message)
+ }
}
+@Suppress("SuboptimalLoggerUsage")
class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
override fun log(message: String) {
logger.trace(message)
override fun log(message: String, t: Throwable) {
logger.trace(message, t)
}
+
+ override fun log(marker: Marker, message: String) {
+ logger.trace(marker(), message)
+ }
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+import org.slf4j.MarkerFactory
+
+enum class Marker(private val marker: org.slf4j.Marker) {
+ ENTRY(MarkerFactory.getMarker("ENTRY")),
+ EXIT(MarkerFactory.getMarker("EXIT")),
+ INVOKE(MarkerFactory.getMarker("INVOKE"));
+
+ operator fun invoke() = marker
+}