2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018-2019 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.config.api.model
 
  22 import arrow.core.Option
 
  23 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 
  24 import org.onap.dcae.collectors.veshv.domain.VesMessage
 
  25 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
  27 data class Routing(val routes: List<Route>) {
 
  29     fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
 
  30             Option.fromNullable(routes.find { it.applies(commonHeader) })
 
  33 data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) {
 
  35     fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
 
  37     operator fun invoke(message: VesMessage): RoutedMessage =
 
  38             RoutedMessage(targetTopic, partitioning(message.header), message)
 
  43 HvVesConfiguration DSL
 
  46 fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init)
 
  48 class RoutingBuilder {
 
  49     private val routes: MutableList<RouteBuilder> = mutableListOf()
 
  51     fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder()
 
  53             .also { routes.add(it) }
 
  55     fun build() = Routing(routes.map { it.build() }.toList())
 
  60     private lateinit var domain: String
 
  61     private lateinit var targetTopic: String
 
  62     private lateinit var partitioning: (CommonEventHeader) -> Int
 
  64     fun fromDomain(domain: String): RouteBuilder = apply {
 
  68     fun toTopic(targetTopic: String): RouteBuilder = apply {
 
  69         this.targetTopic = targetTopic
 
  72     fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply {
 
  73         partitioning = { num }
 
  76     fun build() = Route(domain, targetTopic, partitioning)