fec713add85fcca2d114b190f60df4a5dce04a3a
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / Router.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl
21
22 import arrow.core.None
23 import arrow.core.toOption
24 import org.onap.dcae.collectors.veshv.boundary.Metrics
25 import org.onap.dcae.collectors.veshv.boundary.Sink
26 import org.onap.dcae.collectors.veshv.boundary.SinkFactory
27 import org.onap.dcae.collectors.veshv.config.api.model.Route
28 import org.onap.dcae.collectors.veshv.config.api.model.Routing
29 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
30 import org.onap.dcae.collectors.veshv.domain.VesMessage
31 import org.onap.dcae.collectors.veshv.model.ClientContext
32 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
33 import org.onap.dcae.collectors.veshv.model.MessageDropCause
34 import org.onap.dcae.collectors.veshv.utils.logging.Logger
35 import org.onap.ves.VesEventOuterClass.CommonEventHeader
36 import reactor.core.publisher.Flux
37
38 class Router internal constructor(private val routing: Routing,
39                                   private val messageSinks: Map<String, Lazy<Sink>>,
40                                   private val ctx: ClientContext,
41                                   private val metrics: Metrics) {
42     constructor(routing: Routing,
43                 sinkFactory: SinkFactory,
44                 ctx: ClientContext,
45                 metrics: Metrics) :
46             this(routing, constructMessageSinks(routing, sinkFactory, ctx), ctx, metrics) {
47         logger.debug(ctx::mdc) { "Routing configuration for client: $routing" }
48     }
49
50     fun route(message: VesMessage): Flux<ConsumedMessage> =
51             routeFor(message.header)
52                     .fold({ routeNotFound(message) }, { routeFound(message, it) })
53                     .flatMap {
54                         val sinkTopic = it.sink.topicName()
55                         messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
56                     }
57
58     private fun routeNotFound(message: VesMessage): Flux<Route> {
59         metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
60         logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
61         logger.trace(ctx::fullMdc) { "Routing available for client: $routing" }
62         return Flux.empty<Route>()
63     }
64
65     private fun routeFound(message: VesMessage, route: Route): Flux<Route> {
66         logger.trace(ctx::fullMdc) {
67             "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION"
68         }
69         return Flux.just(route)
70     }
71
72
73     private fun routeFor(header: CommonEventHeader) =
74             routing.find { it.domain == header.domain }.toOption()
75
76     private fun messageSinkFor(sinkTopic: String) = messageSinks
77             .getOrElse(sinkTopic) {
78                 throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic")
79             }
80
81     companion object {
82         private val logger = Logger(Router::class)
83         private val NONE_PARTITION = None
84
85         internal fun constructMessageSinks(routing: Routing,
86                                            sinkFactory: SinkFactory,
87                                            ctx: ClientContext) =
88                 routing.map(Route::sink)
89                         .distinctBy { it.topicName() }
90                         .associateBy({ it.topicName() }, { sinkFactory(it, ctx) })
91     }
92
93     private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
94 }
95
96 internal class MissingMessageSinkException(msg: String) : Throwable(msg)