Creation of server module
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / component / Sut.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import io.netty.buffer.ByteBuf
23 import io.netty.buffer.ByteBufAllocator
24 import io.netty.buffer.UnpooledByteBufAllocator
25 import org.onap.dcae.collectors.veshv.boundary.Collector
26 import org.onap.dcae.collectors.veshv.boundary.Sink
27 import org.onap.dcae.collectors.veshv.boundary.SinkFactory
28 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
29 import org.onap.dcae.collectors.veshv.config.api.model.Routing
30 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
31 import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory
32 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
33 import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
34 import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
35 import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
37 import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
38 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
39 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
40 import org.onap.dcae.collectors.veshv.utils.Closeable
41 import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
42 import reactor.core.publisher.Flux
43 import reactor.core.publisher.Mono
44 import java.time.Duration
45 import java.util.concurrent.atomic.AtomicBoolean
46
47 /**
48  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
49  * @since May 2018
50  */
51 class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable {
52     val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
53     val metrics = FakeMetrics()
54     val sinkProvider = DummySinkFactory(sink)
55
56     private val collectorProvider = HvVesCollectorFactory(
57             configuration,
58             sinkProvider,
59             metrics
60     )
61
62     val collector: Collector
63         get() = collectorProvider(ClientContext(alloc))
64
65
66     fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
67         collector.handleConnection(Flux.fromArray(packets)).block(TIMEOUT)
68         return sink.sentMessages
69     }
70
71     fun handleConnection(vararg packets: ByteBuf) {
72         collector.handleConnection(Flux.fromArray(packets)).block(TIMEOUT)
73     }
74
75     override fun close() = collectorProvider.close()
76
77     companion object {
78         private val TIMEOUT = Duration.ofSeconds(10)
79     }
80 }
81
82 class DummySinkFactory(private val sink: Sink) : SinkFactory {
83     private val sinkInitialized = AtomicBoolean(false)
84
85     override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
86         sinkInitialized.set(true)
87         sink
88     }
89
90     override fun close() =
91             if (sinkInitialized.get()) {
92                 sink.close()
93             } else {
94                 Mono.empty()
95             }
96 }
97
98 fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
99         Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), AlwaysSuccessfulSink())
100
101 fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
102         Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), AlwaysFailingSink())
103
104 fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
105         Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), DelayingSink(delay))