2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2021 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl
22 import arrow.core.None
23 import arrow.core.Option
24 import arrow.core.toOption
25 import org.onap.dcae.collectors.veshv.boundary.Metrics
26 import org.onap.dcae.collectors.veshv.boundary.Sink
27 import org.onap.dcae.collectors.veshv.boundary.SinkFactory
28 import org.onap.dcae.collectors.veshv.config.api.model.Route
29 import org.onap.dcae.collectors.veshv.config.api.model.Routing
30 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
32 import org.onap.dcae.collectors.veshv.domain.VesMessage
33 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
34 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
35 import org.onap.dcae.collectors.veshv.model.MessageDropCause
36 import org.onap.dcae.collectors.veshv.utils.logging.Logger
37 import org.onap.ves.VesEventOuterClass.CommonEventHeader
38 import reactor.core.publisher.Flux
40 internal class Router internal constructor(private val routing: Routing,
41 private val messageSinks: Map<String, Lazy<Sink>>,
42 private val ctx: ClientContext,
43 private val metrics: Metrics) {
44 constructor(routing: Routing,
45 sinkFactory: SinkFactory,
48 this(routing, constructMessageSinks(routing, sinkFactory, ctx), ctx, metrics) {
49 logger.debug(ctx::mdc) { "Routing configuration for client: $routing" }
52 fun route(message: VesMessage): Flux<ConsumedMessage> =
53 routeFor(message.header)
54 .fold({ routeNotFound(message) }, { routeFound(message, it) })
56 val sinkTopic = it.sink.topicName()
57 messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
60 private fun routeNotFound(message: VesMessage): Flux<Route> {
61 metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
62 logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
63 logger.trace(ctx::fullMdc) { "Routing available for client: $routing" }
64 return Flux.empty<Route>()
67 private fun routeFound(message: VesMessage, route: Route): Flux<Route> {
68 logger.trace(ctx::fullMdc) {
69 "Found route for message ${message.header}: $route. Assigned partition: $NONE_PARTITION"
71 return Flux.just(route)
75 private fun routeFor(header: CommonEventHeader): Option<Route> =
77 if (header.domain == VesEventDomain.STND_DEFINED.domainName)
78 it.domain == header.stndDefinedNamespace
80 it.domain == header.domain
84 private fun messageSinkFor(sinkTopic: String) = messageSinks
85 .getOrElse(sinkTopic) {
86 throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic")
90 private val logger = Logger(Router::class)
91 private val NONE_PARTITION = None
93 internal fun constructMessageSinks(routing: Routing,
94 sinkFactory: SinkFactory,
96 routing.map(Route::sink)
97 .distinctBy { it.topicName() }
98 .associateBy({ it.topicName() }, { sinkFactory(it, ctx) })
101 private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
104 internal class MissingMessageSinkException(msg: String) : Throwable(msg)