2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.utils.logging
 
  22 import arrow.core.Either
 
  23 import arrow.core.Option
 
  25 import reactor.core.publisher.Flux
 
  26 import reactor.core.publisher.Mono
 
  28 typealias MessageEither = Either<() -> String, () -> String>
 
  30 fun <T> Logger.handleReactiveStreamError(
 
  31         context: MappedDiagnosticContext,
 
  33         returnFlux: Flux<T> = Flux.empty()): Flux<T> {
 
  34     warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" }
 
  35     withDebug(context) { log("Detailed stack trace", ex) }
 
  39 fun <T> Try<T>.filterFailedWithLog(logger: Logger,
 
  40                                    context: MappedDiagnosticContext,
 
  41                                    acceptedMsg: (T) -> String,
 
  42                                    rejectedMsg: (Throwable) -> String): Flux<T> =
 
  44             logger.withWarn(context) { log(rejectedMsg(ex)) }
 
  47             logger.trace(context) { acceptedMsg(obj) }
 
  51 fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
 
  52                                      context: MappedDiagnosticContext,
 
  53                                      acceptedMsg: (T) -> String,
 
  54                                      rejectedMsg: () -> String): Flux<T> =
 
  56             logger.warn(context, rejectedMsg)
 
  59             logger.trace(context) { acceptedMsg(it) }
 
  63 fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
 
  64                                     context: MappedDiagnosticContext,
 
  65                                     predicate: (T) -> MessageEither): Flux<T> =
 
  68                 logger.warn(context, it)
 
  71                 logger.trace(context, it)