package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
import io.netty.buffer.ByteBuf;
+
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.stream.IntStream;
+
+import io.netty.handler.ssl.SslContext;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
*/
public class DummyCollector {
+ private Optional<SslContext> sslContext;
private final List<ByteBuf> receivedData = Collections.synchronizedList(new ArrayList<>());
private DisposableServer server;
.map(Tuple2::getT1)
.share();
+ DummyCollector(Optional<SslContext> sslContext) {
+ this.sslContext = sslContext;
+ }
+
public InetSocketAddress start() {
- server = TcpServer.create()
- .host("localhost")
- .port(6666)
- .wiretap(true)
- .handle(this::handleConnection)
- .bindNow();
+ TcpServer tcpServer =
+ sslContext.map(context -> TcpServer.create()
+ .secure(ssl -> ssl.sslContext(context)))
+ .orElseGet(TcpServer::create)
+ .host("localhost")
+ .port(6666)
+ .wiretap(true)
+ .handle(this::handleConnection);
+ server = tcpServer.bindNow();
return server.address();
}
import io.netty.buffer.ByteBuf;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
import org.onap.ves.MeasDataCollectionOuterClass;
import org.onap.ves.VesEventOuterClass.VesEvent;
import reactor.core.publisher.Flux;
+import java.time.Duration;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
*/
private static final int PERIOD = 1000;
private static final String OBJECT_INSTANCE_ID = "DH-1";
- private final SystemUnderTestWrapper sut = new SystemUnderTestWrapper();
-
- @BeforeEach
- void setUp() {
- sut.start();
- }
+ private final SystemUnderTestWrapper sut = new SystemUnderTestWrapper(Duration.ofSeconds(10));
@AfterEach
void tearDown() {
}
@Test
- void singleMessageTest() throws Exception {
+ void singleMessageTest_withUnsecureConnection() throws Exception {
// given
+ final VesEvent sampleEvent = createSimpleVesEvent();
+ final Flux<VesEvent> input = Flux.just(sampleEvent);
+
+ // when
+ sut.start();
+ final ByteBuf receivedData = sut.blockingSend(input);
+ // then
+ WireProtocolDecoder decoded = WireProtocolDecoder.decode(receivedData);
+ assertThat(decoded.type).isEqualTo(PayloadType.PROTOBUF.getPayloadTypeBytes().getShort());
+ assertThat(decoded.event).isEqualTo(sampleEvent);
+
+ }
+
+ @Test
+ void singleMessageTest_withSecureConnection() throws Exception {
+ // given
final VesEvent sampleEvent = createSimpleVesEvent();
final Flux<VesEvent> input = Flux.just(sampleEvent);
// when
+ sut.startSecure();
final ByteBuf receivedData = sut.blockingSend(input);
// then
WireProtocolDecoder decoded = WireProtocolDecoder.decode(receivedData);
assertThat(decoded.type).isEqualTo(PayloadType.PROTOBUF.getPayloadTypeBytes().getShort());
assertThat(decoded.event).isEqualTo(sampleEvent);
+
}
private VesEvent createSimpleVesEvent() {
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
import io.netty.buffer.ByteBuf;
+import io.netty.handler.ssl.SslContext;
import io.vavr.collection.HashSet;
import io.vavr.control.Try;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.util.Optional;
-import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
-import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
-import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
+import org.onap.dcaegen2.services.sdk.security.ssl.*;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions;
public class SystemUnderTestWrapper {
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(5);
- private final DummyCollector collector = new DummyCollector();
+ private static final String TRUST_CERT_PATH = "/trust.p12";
+ private static final String TRUST_PASSWORD_PATH = "/trust.pass";
+ private static final String CLIENT_CERT_PATH = "/client.p12";
+ private static final String CLIENT_PASSWORD_PATH = "/client.pass";
+ private static final String SERVER_CERT_PATH = "/server.p12";
+ private static final String SERVER_PASSWORD_PATH = "/server.pass";
+
+ private DummyCollector collector;
private HvVesProducer cut;
private final Duration timeout;
+ private final SslFactory sslFactory = new SslFactory();
public SystemUnderTestWrapper(Duration timeout) {
this.timeout = timeout;
}
public void startSecure() {
- start(ImmutableProducerOptions.builder()
- .securityKeys(ImmutableSecurityKeys.builder()
- .keyStore(ImmutableSecurityKeysStore.of(resource("/client.p12").get()))
- .keyStorePassword(Passwords.fromResource("/client.pass"))
- .trustStore(ImmutableSecurityKeysStore.of(resource("/trust.p12").get()))
- .trustStorePassword(Passwords.fromResource("/trust.pass"))
- .build()));
+ collector = createCollectorWithEnabledSSL();
+
+ final SecurityKeys producerSecurityKeys = ImmutableSecurityKeys.builder()
+ .keyStore(ImmutableSecurityKeysStore.of(resource(CLIENT_CERT_PATH).get()))
+ .keyStorePassword(Passwords.fromResource(CLIENT_PASSWORD_PATH))
+ .trustStore(ImmutableSecurityKeysStore.of(resource(TRUST_CERT_PATH).get()))
+ .trustStorePassword(Passwords.fromResource(TRUST_PASSWORD_PATH))
+ .build();
+ start(ImmutableProducerOptions.builder().securityKeys(producerSecurityKeys));
}
public void start() {
+ collector = new DummyCollector(Optional.empty());
start(createDefaultOptions());
}
return collector.dataFromFirstClient();
}
+ private DummyCollector createCollectorWithEnabledSSL() {
+ final SecurityKeys collectorSecurityKeys = ImmutableSecurityKeys.builder()
+ .keyStore(ImmutableSecurityKeysStore.of(resource(SERVER_CERT_PATH).get()))
+ .keyStorePassword(Passwords.fromResource(SERVER_PASSWORD_PATH))
+ .trustStore(ImmutableSecurityKeysStore.of(resource(TRUST_CERT_PATH).get()))
+ .trustStorePassword(Passwords.fromResource(TRUST_PASSWORD_PATH))
+ .build();
+ final SslContext collectorSslContext = sslFactory.createSecureServerContext(collectorSecurityKeys);
+ return new DummyCollector(Optional.of(collectorSslContext));
+ }
+
private Builder createDefaultOptions() {
return ImmutableProducerOptions.builder();
}