Merge "Allow to specify JVM options by means of env-var"
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-utils / src / main / kotlin / org / onap / dcae / collectors / veshv / utils / logging / reactive_logging.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.utils.logging
21
22 import arrow.core.Either
23 import arrow.core.Option
24 import arrow.core.Try
25 import reactor.core.publisher.Flux
26 import reactor.core.publisher.Mono
27
28 fun <T> Logger.handleReactiveStreamError(
29         context: MappedDiagnosticContext,
30         ex: Throwable,
31         returnFlux: Flux<T> = Flux.empty()): Flux<T> {
32     warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" }
33     withDebug(context) { log("Detailed stack trace", ex) }
34     return returnFlux
35 }
36
37 fun <T> Try<T>.filterFailedWithLog(logger: Logger,
38                                    context: MappedDiagnosticContext,
39                                    acceptedMsg: (T) -> String,
40                                    rejectedMsg: (Throwable) -> String): Flux<T> =
41         fold({ ex ->
42             logger.withWarn(context) { log(rejectedMsg(ex)) }
43             Flux.empty<T>()
44         }, { obj ->
45             logger.trace(context) { acceptedMsg(obj) }
46             Flux.just(obj)
47         })
48
49 fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
50                                      context: MappedDiagnosticContext,
51                                      acceptedMsg: (T) -> String,
52                                      rejectedMsg: () -> String): Flux<T> =
53         fold({
54             logger.warn(context, rejectedMsg)
55             Flux.empty<T>()
56         }, {
57             logger.trace(context) { acceptedMsg(it) }
58             Flux.just(it)
59         })
60
61 fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
62                                     context: MappedDiagnosticContext,
63                                     predicate: (T) -> Either<() -> String, () -> String>) =
64         flatMap { t ->
65             predicate(t).fold({
66                 logger.warn(context, it)
67                 Mono.empty<T>()
68             }, {
69                 logger.trace(context, it)
70                 Mono.just<T>(t)
71             })
72         }