Add stndDefined domain to HV-VES
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / Router.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2021 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl
21
22 import arrow.core.None
23 import arrow.core.Option
24 import arrow.core.toOption
25 import org.onap.dcae.collectors.veshv.boundary.Metrics
26 import org.onap.dcae.collectors.veshv.boundary.Sink
27 import org.onap.dcae.collectors.veshv.boundary.SinkFactory
28 import org.onap.dcae.collectors.veshv.config.api.model.Route
29 import org.onap.dcae.collectors.veshv.config.api.model.Routing
30 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
32 import org.onap.dcae.collectors.veshv.domain.VesMessage
33 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
34 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
35 import org.onap.dcae.collectors.veshv.model.MessageDropCause
36 import org.onap.dcae.collectors.veshv.utils.logging.Logger
37 import org.onap.ves.VesEventOuterClass.CommonEventHeader
38 import reactor.core.publisher.Flux
39
40 internal class Router internal constructor(private val routing: Routing,
41                                            private val messageSinks: Map<String, Lazy<Sink>>,
42                                            private val ctx: ClientContext,
43                                            private val metrics: Metrics) {
44     constructor(routing: Routing,
45                 sinkFactory: SinkFactory,
46                 ctx: ClientContext,
47                 metrics: Metrics) :
48             this(routing, constructMessageSinks(routing, sinkFactory, ctx), ctx, metrics) {
49         logger.debug(ctx::mdc) { "Routing configuration for client: $routing" }
50     }
51
52     fun route(message: VesMessage): Flux<ConsumedMessage> =
53             routeFor(message.header)
54                     .fold({ routeNotFound(message) }, { routeFound(message, it) })
55                     .flatMap {
56                         val sinkTopic = it.sink.topicName()
57                         messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
58                     }
59
60     private fun routeNotFound(message: VesMessage): Flux<Route> {
61         metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
62         logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
63         logger.trace(ctx::fullMdc) { "Routing available for client: $routing" }
64         return Flux.empty<Route>()
65     }
66
67     private fun routeFound(message: VesMessage, route: Route): Flux<Route> {
68         logger.trace(ctx::fullMdc) {
69             "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION"
70         }
71         return Flux.just(route)
72     }
73
74
75     private fun routeFor(header: CommonEventHeader): Option<Route> =
76             routing.find {
77                 if (header.domain == VesEventDomain.STND_DEFINED.domainName)
78                     it.domain == header.stndDefinedNamespace
79                 else {
80                     it.domain == header.domain
81                 }
82             }.toOption()
83
84     private fun messageSinkFor(sinkTopic: String) = messageSinks
85             .getOrElse(sinkTopic) {
86                 throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic")
87             }
88
89     companion object {
90         private val logger = Logger(Router::class)
91         private val NONE_PARTITION = None
92
93         internal fun constructMessageSinks(routing: Routing,
94                                            sinkFactory: SinkFactory,
95                                            ctx: ClientContext) =
96                 routing.map(Route::sink)
97                         .distinctBy { it.topicName() }
98                         .associateBy({ it.topicName() }, { sinkFactory(it, ctx) })
99     }
100
101     private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
102 }
103
104 internal class MissingMessageSinkException(msg: String) : Throwable(msg)