Simple OK case for HV-VES
[dcaegen2/services/sdk.git] / services / hv-ves-client / producer / ct / src / test / java / org / onap / dcaegen2 / services / sdk / services / hvves / client / producer / ct / DummyCollector.java
1 /*
2  * ============LICENSE_START=======================================================
3  * DCAEGEN2-SERVICES-SDK
4  * ================================================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
21
22 import io.netty.buffer.ByteBuf;
23 import java.net.InetSocketAddress;
24 import java.time.Duration;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.stream.IntStream;
29 import org.reactivestreams.Publisher;
30 import reactor.core.publisher.Flux;
31 import reactor.core.publisher.ReplayProcessor;
32 import reactor.netty.DisposableServer;
33 import reactor.netty.NettyInbound;
34 import reactor.netty.NettyOutbound;
35 import reactor.netty.tcp.TcpServer;
36 import reactor.util.function.Tuple2;
37
38 /**
39  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
40  */
41 public class DummyCollector {
42
43     private final List<ByteBuf> receivedData = Collections.synchronizedList(new ArrayList<>());
44     private DisposableServer server;
45     private ReplayProcessor<ClientDisconnected> clientDisconnected = ReplayProcessor.create();
46     private Flux<Integer> handledClientsCount = Flux.fromStream(IntStream.iterate(0, x -> x + 1).boxed())
47             .zipWith(clientDisconnected)
48             .map(Tuple2::getT1)
49             .share();
50
51     public InetSocketAddress start() {
52         server = TcpServer.create()
53                 .host("localhost")
54                 .port(6666)
55                 .wiretap(true)
56                 .handle(this::handleConnection)
57                 .bindNow();
58         return server.address();
59     }
60
61     public void stop() {
62         server.disposeNow();
63         server = null;
64     }
65
66     public void blockUntilFirstClientIsHandled(Duration timeout) {
67         handledClientsCount.blockFirst(timeout);
68     }
69
70     public void blockUntilFirstClientsAreHandled(int numClients, Duration timeout) {
71         handledClientsCount.take(numClients).blockLast(timeout);
72     }
73
74     public ByteBuf dataFromClient(int clientNumber) {
75         return receivedData.get(clientNumber);
76     }
77
78     public ByteBuf dataFromFirstClient() {
79         return dataFromClient(0);
80     }
81
82     private Publisher<Void> handleConnection(NettyInbound nettyInbound, NettyOutbound nettyOutbound) {
83         nettyInbound.receive()
84                 .aggregate()
85                 .retain()
86                 .log()
87                 .doOnNext(this::collect)
88                 .subscribe();
89
90         return nettyOutbound.neverComplete();
91     }
92
93     private void collect(ByteBuf buf) {
94         receivedData.add(buf);
95         clientDisconnected.onNext(ClientDisconnected.INSTANCE);
96     }
97
98
99     private static final class ClientDisconnected {
100
101         private static final ClientDisconnected INSTANCE = new ClientDisconnected();
102     }
103 }