7d8f0cb107685cc46d385cbc4287c207ba29063e
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / HvVesCollector.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl
21
22 import io.netty.buffer.ByteBuf
23 import org.onap.dcae.collectors.veshv.boundary.Collector
24 import org.onap.dcae.collectors.veshv.boundary.Metrics
25 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
26 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
27 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
28 import org.onap.dcae.collectors.veshv.model.ClientContext
29 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
30 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
31 import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
32 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
33 import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
34 import org.onap.dcae.collectors.veshv.domain.VesMessage
35 import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
36 import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft
37 import org.onap.dcae.collectors.veshv.utils.logging.Logger
38 import org.onap.dcae.collectors.veshv.utils.logging.MessageEither
39 import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
40 import reactor.core.publisher.Flux
41 import reactor.core.publisher.Mono
42
43 /**
44  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
45  * @since May 2018
46  */
47 internal class HvVesCollector(
48         private val clientContext: ClientContext,
49         private val wireChunkDecoder: WireChunkDecoder,
50         private val protobufDecoder: VesDecoder,
51         private val router: Router,
52         private val metrics: Metrics) : Collector {
53
54     override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
55             dataStream
56                     .transform { decodeWireFrame(it) }
57                     .transform(::filterInvalidWireFrame)
58                     .transform(::decodeProtobufPayload)
59                     .transform(::filterInvalidProtobufMessages)
60                     .transform(::route)
61                     .handleErrors()
62                     .doFinally { releaseBuffersMemory() }
63                     .then()
64
65     private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
66             .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
67             .concatMap(wireChunkDecoder::decode)
68             .doOnNext(metrics::notifyMessageReceived)
69
70     private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
71             .filterFailedWithLog {
72                 MessageValidator
73                         .validateFrameMessage(it)
74                         .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
75             }
76
77     private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
78             .flatMap { frame ->
79                 protobufDecoder
80                         .decode(frame)
81                         .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
82                         .filterFailedWithLog(logger, clientContext::fullMdc,
83                                 { "Ves event header decoded successfully" },
84                                 { "Failed to decode ves event header, reason: ${it.message}" })
85             }
86
87     private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
88             .filterFailedWithLog {
89                 MessageValidator
90                         .validateProtobufMessage(it)
91                         .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
92             }
93
94     private fun route(flux: Flux<VesMessage>) = flux
95             .flatMap(router::route)
96             .doOnNext(this::updateSinkMetrics)
97
98     private fun updateSinkMetrics(consumedMessage: ConsumedMessage) {
99         when (consumedMessage) {
100             is SuccessfullyConsumedMessage ->
101                 metrics.notifyMessageSent(consumedMessage.message)
102             is FailedToConsumeMessage ->
103                 metrics.notifyMessageDropped(consumedMessage.cause)
104         }
105     }
106
107     private fun releaseBuffersMemory() = wireChunkDecoder.release()
108             .also { logger.debug { "Released buffer memory after handling message stream" } }
109
110     private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume {
111         metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
112         logger.handleReactiveStreamError(clientContext, it)
113     }
114
115     private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> =
116             filterFailedWithLog(logger, clientContext::fullMdc, predicate)
117
118     companion object {
119         private val logger = Logger(HvVesCollector::class)
120     }
121 }