Implement simple health check mechanism
[dcaegen2/collectors/hv-ves.git] / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / fakes / configuration.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.fakes
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
24 import org.onap.dcae.collectors.veshv.model.routing
25 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
26 import reactor.core.publisher.FluxProcessor
27 import reactor.core.publisher.UnicastProcessor
28 import reactor.retry.RetryExhaustedException
29
30
31 const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
32 const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
33 const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas"
34
35 val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
36         kafkaBootstrapServers = "localhost:9969",
37         routing = routing {
38             defineRoute {
39                 fromDomain(Domain.HVRANMEAS)
40                 toTopic(HVRANMEAS_TOPIC)
41                 withFixedPartitioning()
42             }
43         }.build()
44 )
45
46 val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
47         kafkaBootstrapServers = "localhost:9969",
48         routing = routing {
49             defineRoute {
50                 fromDomain(Domain.HVRANMEAS)
51                 toTopic(HVRANMEAS_TOPIC)
52                 withFixedPartitioning()
53             }
54             defineRoute {
55                 fromDomain(Domain.HEARTBEAT)
56                 toTopic(HVRANMEAS_TOPIC)
57                 withFixedPartitioning()
58             }
59             defineRoute {
60                 fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING)
61                 toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
62                 withFixedPartitioning()
63             }
64         }.build()
65 )
66
67
68 val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
69         kafkaBootstrapServers = "localhost:9969",
70         routing = routing {
71             defineRoute {
72                 fromDomain(Domain.HVRANMEAS)
73                 toTopic(ALTERNATE_HVRANMEAS_TOPIC)
74                 withFixedPartitioning()
75             }
76         }.build()
77 )
78
79
80 val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
81         kafkaBootstrapServers = "localhost:9969",
82         routing = routing {
83         }.build()
84 )
85
86 class FakeConfigurationProvider : ConfigurationProvider {
87     private var shouldThrowException = false
88     private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
89
90     fun updateConfiguration(collectorConfiguration: CollectorConfiguration) =
91             if (shouldThrowException) {
92                 configStream.onError(RetryExhaustedException("I'm so tired"))
93             } else {
94                 configStream.onNext(collectorConfiguration)
95             }
96
97
98     fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) {
99         this.shouldThrowException = shouldThrowException
100     }
101
102     override fun invoke() = configStream
103 }