Harmonize logging and add new logs
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / model / routing.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.model
21
22 import arrow.core.Option
23 import org.onap.dcae.collectors.veshv.utils.logging.Logger
24 import org.onap.ves.VesEventOuterClass.CommonEventHeader
25
26 data class Routing(val routes: List<Route>) {
27
28     fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
29             Option.fromNullable(routes.find { it.applies(commonHeader) }).also {
30                 if (it.isEmpty()) {
31                     logger.debug { "No route is defined for domain: ${commonHeader.domain}" }
32                 }
33             }
34
35     companion object {
36         private val logger = Logger(Routing::class)
37     }
38 }
39
40 data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
41
42     fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
43
44     operator fun invoke(message: VesMessage): RoutedMessage =
45             RoutedMessage(targetTopic, partitioning(message.header), message)
46 }
47
48
49 /*
50 Configuration DSL
51  */
52
53 fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder {
54     val conf = RoutingBuilder()
55     conf.init()
56     return conf
57 }
58
59 class RoutingBuilder {
60     private val routes: MutableList<RouteBuilder> = mutableListOf()
61
62     fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder {
63         val rule = RouteBuilder()
64         rule.init()
65         routes.add(rule)
66         return rule
67     }
68
69     fun build() = Routing(routes.map { it.build() }.toList())
70 }
71
72 class RouteBuilder {
73
74     private lateinit var domain: String
75     private lateinit var targetTopic: String
76     private lateinit var partitioning: (CommonEventHeader) -> Int
77
78     fun fromDomain(domain: String) {
79         this.domain = domain
80     }
81
82     fun toTopic(targetTopic: String) {
83         this.targetTopic = targetTopic
84     }
85
86     fun withFixedPartitioning(num: Int = 0) {
87         partitioning = { num }
88     }
89
90     fun build() = Route(domain, targetTopic, partitioning)
91
92 }