Add of adapter fun with param ClientContext 00/74600/2
authorkjaniak <kornel.janiak@nokia.com>
Thu, 13 Dec 2018 06:39:51 +0000 (07:39 +0100)
committerkjaniak <kornel.janiak@nokia.com>
Thu, 13 Dec 2018 07:54:05 +0000 (08:54 +0100)
Change-Id: Ib9ac6ab16c51ddf40ae849f4ce04ab7e25609f56
Issue-ID: DCAEGEN2-1031
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt [new file with mode: 0644]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt

index 0977595..6105b58 100644 (file)
@@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Option
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.Routing
 import org.onap.dcae.collectors.veshv.model.VesMessage
index 0d07504..cf73aed 100644 (file)
@@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -33,7 +34,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
@@ -56,7 +56,7 @@ internal class VesHvCollector(
                     .transform(::decodeProtobufPayload)
                     .transform(::filterInvalidProtobufMessages)
                     .transform(::routeMessage)
-                    .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) }
+                    .onErrorResume { logger.handleReactiveStreamError(clientContext, it) }
                     .doFinally { releaseBuffersMemory() }
                     .then()
 
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
new file mode 100644 (file)
index 0000000..21b79bb
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
+import reactor.core.publisher.Flux
+
+@Suppress("TooManyFunctions")
+internal object ClientContextLogging {
+    fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
+    fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
+    fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
+    fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
+    fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
+
+    fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
+    fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
+    fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
+    fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
+    fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
+
+    fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable,
+                                             returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+        return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux)
+    }
+}
+
index f6cb018..ec8593a 100644 (file)
@@ -22,8 +22,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
index 07ce760..690a7d1 100644 (file)
@@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
 import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
index 3fa05c4..6f02d43 100644 (file)
@@ -24,9 +24,9 @@ import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
 import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
index 349b078..b735138 100644 (file)
@@ -27,10 +27,11 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Flux.defer
 import reactor.core.publisher.SynchronousSink
 
 /**
@@ -46,7 +47,7 @@ internal class WireChunkDecoder(
         streamBuffer.release()
     }
 
-    fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
+    fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = defer {
         logIncomingMessage(byteBuf)
         if (byteBuf.readableBytes() == 0) {
             byteBuf.release()
@@ -54,7 +55,7 @@ internal class WireChunkDecoder(
         } else {
             streamBuffer.addComponent(true, byteBuf)
             generateFrames()
-                    .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) }
+                    .onErrorResume { logger.handleReactiveStreamError(ctx, it, Flux.error(it)) }
                     .doFinally { streamBuffer.discardReadComponents() }
         }
     }
index 213b743..305e4cb 100644 (file)
@@ -41,17 +41,3 @@ data class ClientContext(
         return result
     }
 }
-
-object ClientContextLogging {
-    fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
-    fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
-    fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
-    fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
-    fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
-
-    fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
-    fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
-    fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
-    fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
-    fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
-}
index 1e98f2f..95590d9 100644 (file)
@@ -34,7 +34,6 @@ fun <T> Logger.handleReactiveStreamError(
     return returnFlux
 }
 
-
 fun <T> Try<T>.filterFailedWithLog(logger: Logger,
                                    context: MappedDiagnosticContext,
                                    acceptedMsg: (T) -> String,