Bump checkstyle version
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / fakes / configuration.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.fakes
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
24 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
25 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
26 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
27 import org.onap.dcae.collectors.veshv.model.routing
28
29 import reactor.core.publisher.FluxProcessor
30 import reactor.core.publisher.UnicastProcessor
31 import reactor.retry.RetryExhaustedException
32
33
34 const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
35 const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
36 const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
37
38 val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
39         kafkaBootstrapServers = "localhost:9969",
40         routing = routing {
41             defineRoute {
42                 fromDomain(PERF3GPP.domainName)
43                 toTopic(PERF3GPP_TOPIC)
44                 withFixedPartitioning()
45             }
46         }.build()
47 )
48
49 val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
50         kafkaBootstrapServers = "localhost:9969",
51         routing = routing {
52             defineRoute {
53                 fromDomain(PERF3GPP.domainName)
54                 toTopic(PERF3GPP_TOPIC)
55                 withFixedPartitioning()
56             }
57             defineRoute {
58                 fromDomain(HEARTBEAT.domainName)
59                 toTopic(PERF3GPP_TOPIC)
60                 withFixedPartitioning()
61             }
62             defineRoute {
63                 fromDomain(MEASUREMENT.domainName)
64                 toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
65                 withFixedPartitioning()
66             }
67         }.build()
68 )
69
70
71 val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
72         kafkaBootstrapServers = "localhost:9969",
73         routing = routing {
74             defineRoute {
75                 fromDomain(PERF3GPP.domainName)
76                 toTopic(ALTERNATE_PERF3GPP_TOPIC)
77                 withFixedPartitioning()
78             }
79         }.build()
80 )
81
82
83 val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
84         kafkaBootstrapServers = "localhost:9969",
85         routing = routing {
86         }.build()
87 )
88
89 class FakeConfigurationProvider : ConfigurationProvider {
90     private var shouldThrowException = false
91     private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
92
93     fun updateConfiguration(collectorConfiguration: CollectorConfiguration) =
94             if (shouldThrowException) {
95                 configStream.onError(RetryExhaustedException("I'm so tired"))
96             } else {
97                 configStream.onNext(collectorConfiguration)
98             }
99
100
101     fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) {
102         this.shouldThrowException = shouldThrowException
103     }
104
105     override fun invoke() = configStream
106 }