HV VES Collector seed code
[dcaegen2/collectors/hv-ves.git] / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / domain / routing.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.domain
21
22 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
23 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
24
25 data class Routing(val routes: List<Route>) {
26
27     fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) }
28 }
29
30 data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
31
32     fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
33
34     operator fun invoke(message: VesMessage): RoutedMessage =
35             RoutedMessage(targetTopic, partitioning(message.header), message)
36 }
37
38
39 /*
40 Configuration DSL
41  */
42
43 fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder {
44     val conf = RoutingBuilder()
45     conf.init()
46     return conf
47 }
48
49 class RoutingBuilder {
50     private val routes: MutableList<RouteBuilder> = mutableListOf()
51
52     fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder {
53         val rule = RouteBuilder()
54         rule.init()
55         routes.add(rule)
56         return rule
57     }
58
59     fun build() = Routing(routes.map { it.build() }.toList())
60 }
61
62 class RouteBuilder {
63
64     private lateinit var domain: Domain
65     private lateinit var targetTopic: String
66     private lateinit var partitioning: (CommonEventHeader) -> Int
67
68     fun fromDomain(domain: Domain) {
69         this.domain = domain
70     }
71
72     fun toTopic(targetTopic: String) {
73         this.targetTopic = targetTopic
74     }
75
76     fun withFixedPartitioning(num: Int = 1) {
77         partitioning = { _ -> num }
78     }
79
80     fun build() = Route(domain, targetTopic, partitioning)
81
82 }