/* * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong /** * @author Piotr Jaszczyk * @since May 2018 */ class StoringSink : Sink { private val sent: Deque = ConcurrentLinkedDeque() private val active = AtomicBoolean(true) val closed get() = !active.get() val sentMessages: List get() = sent.toList() override fun send(messages: Flux): Flux { return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } override fun close(): Mono = Mono.fromRunnable { active.set(false) } } /** * @author Piotr Jaszczyk * @since May 2018 */ class CountingSink : Sink { private val atomicCount = AtomicLong(0) val count: Long get() = atomicCount.get() override fun send(messages: Flux): Flux { return messages.doOnNext { atomicCount.incrementAndGet() }.map(::SuccessfullyConsumedMessage) } } open class ProcessingSink(private val transformer: (Flux) -> Publisher) : Sink { override fun send(messages: Flux): Flux = messages.transform(transformer) } class AlwaysSuccessfulSink : ProcessingSink({ it.map(::SuccessfullyConsumedMessage) }) class AlwaysFailingSink : ProcessingSink({ stream -> stream.map { FailedToConsumeMessage(it, null, MessageDropCause.KAFKA_FAILURE) } }) class DelayingSink(delay: Duration) : ProcessingSink({ it.delayElements(delay).map(::SuccessfullyConsumedMessage) })