70e9cdff09a4dc547cdd890c48c70d05d9498acf
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * DCAEGEN2-SERVICES-SDK
4  * ================================================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
21
22 import io.netty.buffer.ByteBuf;
23
24 import java.net.InetSocketAddress;
25 import java.time.Duration;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Optional;
30 import java.util.stream.IntStream;
31
32 import io.netty.handler.ssl.SslContext;
33 import org.reactivestreams.Publisher;
34 import reactor.core.publisher.Flux;
35 import reactor.core.publisher.ReplayProcessor;
36 import reactor.netty.DisposableServer;
37 import reactor.netty.NettyInbound;
38 import reactor.netty.NettyOutbound;
39 import reactor.netty.tcp.TcpServer;
40 import reactor.util.function.Tuple2;
41
42 /**
43  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
44  */
45 public class DummyCollector {
46     private Optional<SslContext> sslContext;
47
48     private final List<ByteBuf> receivedData = Collections.synchronizedList(new ArrayList<>());
49     private DisposableServer server;
50     private ReplayProcessor<ClientDisconnected> clientDisconnected = ReplayProcessor.create();
51     private Flux<Integer> handledClientsCount = Flux.fromStream(IntStream.iterate(0, x -> x + 1).boxed())
52             .zipWith(clientDisconnected)
53             .map(Tuple2::getT1)
54             .share();
55
56     DummyCollector(Optional<SslContext> sslContext) {
57         this.sslContext = sslContext;
58     }
59
60     public InetSocketAddress start() {
61         TcpServer tcpServer =
62                 sslContext.map(context -> TcpServer.create()
63                         .secure(ssl -> ssl.sslContext(context)))
64                         .orElseGet(TcpServer::create)
65                         .host("localhost")
66                         .port(6666)
67                         .wiretap(true)
68                         .handle(this::handleConnection);
69         server = tcpServer.bindNow();
70         return server.address();
71     }
72
73     public void stop() {
74         server.disposeNow();
75         server = null;
76     }
77
78     public void blockUntilFirstClientIsHandled(Duration timeout) {
79         handledClientsCount.blockFirst(timeout);
80     }
81
82     public void blockUntilFirstClientsAreHandled(int numClients, Duration timeout) {
83         handledClientsCount.take(numClients).blockLast(timeout);
84     }
85
86     public ByteBuf dataFromClient(int clientNumber) {
87         return receivedData.get(clientNumber);
88     }
89
90     public ByteBuf dataFromFirstClient() {
91         return dataFromClient(0);
92     }
93
94     private Publisher<Void> handleConnection(NettyInbound nettyInbound, NettyOutbound nettyOutbound) {
95         nettyInbound.receive()
96                 .aggregate()
97                 .retain()
98                 .log()
99                 .doOnNext(this::collect)
100                 .subscribe();
101
102         return nettyOutbound.neverComplete();
103     }
104
105     private void collect(ByteBuf buf) {
106         receivedData.add(buf);
107         clientDisconnected.onNext(ClientDisconnected.INSTANCE);
108     }
109
110
111     private static final class ClientDisconnected {
112
113         private static final ClientDisconnected INSTANCE = new ClientDisconnected();
114     }
115 }