Bump checkstyle version
[dcaegen2/collectors/hv-ves.git] / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / VesHvCollector.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl
21
22 import arrow.core.Option
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.ByteBufAllocator
25 import org.onap.dcae.collectors.veshv.boundary.Collector
26 import org.onap.dcae.collectors.veshv.boundary.Metrics
27 import org.onap.dcae.collectors.veshv.boundary.Sink
28 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
29 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
30 import org.onap.dcae.collectors.veshv.model.RoutedMessage
31 import org.onap.dcae.collectors.veshv.model.VesMessage
32 import org.onap.dcae.collectors.veshv.utils.logging.Logger
33 import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
34 import reactor.core.publisher.Flux
35 import reactor.core.publisher.Mono
36
37 /**
38  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
39  * @since May 2018
40  */
41 internal class VesHvCollector(
42         private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
43         private val protobufDecoder: VesDecoder,
44         private val router: Router,
45         private val sink: Sink,
46         private val metrics: Metrics) : Collector {
47
48     override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
49             wireChunkDecoderSupplier(alloc).let { wireDecoder ->
50                 dataStream
51                         .transform { decodeWireFrame(it, wireDecoder) }
52                         .filter(WireFrameMessage::isValid)
53                         .transform(::decodePayload)
54                         .filter(VesMessage::isValid)
55                         .transform(::routeMessage)
56                         .onErrorResume { logger.handleReactiveStreamError(it) }
57                         .doFinally { releaseBuffersMemory(wireDecoder) }
58                         .then()
59             }
60
61     private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
62             .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
63             .concatMap(decoder::decode)
64             .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
65
66     private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
67             .map(WireFrameMessage::payload)
68             .map(protobufDecoder::decode)
69             .flatMap { omitWhenNone(it) }
70
71     private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
72             .flatMap(this::findRoute)
73             .compose(sink::send)
74             .doOnNext { metrics.notifyMessageSent(it.topic) }
75
76
77     private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
78
79     private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
80             { Mono.empty() },
81             { Mono.just(it) })
82
83     private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
84
85     companion object {
86         private val logger = Logger(VesHvCollector::class)
87     }
88 }