2 * ============LICENSE_START=======================================================
3 * DCAEGEN2-SERVICES-SDK
4 * ================================================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
22 import io.netty.buffer.ByteBuf;
24 import java.net.InetSocketAddress;
25 import java.time.Duration;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Optional;
30 import java.util.stream.IntStream;
32 import io.netty.handler.ssl.SslContext;
33 import org.reactivestreams.Publisher;
34 import reactor.core.publisher.Flux;
35 import reactor.core.publisher.ReplayProcessor;
36 import reactor.netty.DisposableServer;
37 import reactor.netty.NettyInbound;
38 import reactor.netty.NettyOutbound;
39 import reactor.netty.tcp.TcpServer;
40 import reactor.util.function.Tuple2;
43 * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
45 public class DummyCollector {
46 private Optional<SslContext> sslContext;
48 private final List<ByteBuf> receivedData = Collections.synchronizedList(new ArrayList<>());
49 private DisposableServer server;
50 private ReplayProcessor<ClientDisconnected> clientDisconnected = ReplayProcessor.create();
51 private Flux<Integer> handledClientsCount = Flux.fromStream(IntStream.iterate(0, x -> x + 1).boxed())
52 .zipWith(clientDisconnected)
56 DummyCollector(Optional<SslContext> sslContext) {
57 this.sslContext = sslContext;
60 public InetSocketAddress start() {
62 sslContext.map(context -> TcpServer.create()
63 .secure(ssl -> ssl.sslContext(context)))
64 .orElseGet(TcpServer::create)
68 .handle(this::handleConnection);
69 server = tcpServer.bindNow();
70 return server.address();
78 public void blockUntilFirstClientIsHandled(Duration timeout) {
79 handledClientsCount.blockFirst(timeout);
82 public void blockUntilFirstClientsAreHandled(int numClients, Duration timeout) {
83 handledClientsCount.take(numClients).blockLast(timeout);
86 public ByteBuf dataFromClient(int clientNumber) {
87 return receivedData.get(clientNumber);
90 public ByteBuf dataFromFirstClient() {
91 return dataFromClient(0);
94 private Publisher<Void> handleConnection(NettyInbound nettyInbound, NettyOutbound nettyOutbound) {
95 nettyInbound.receive()
99 .doOnNext(this::collect)
102 return nettyOutbound.neverComplete();
105 private void collect(ByteBuf buf) {
106 receivedData.add(buf);
107 clientDisconnected.onNext(ClientDisconnected.INSTANCE);
111 private static final class ClientDisconnected {
113 private static final ClientDisconnected INSTANCE = new ClientDisconnected();