HV-VES Client - ProducerOptions 37/75937/6
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 17 Jan 2019 10:42:35 +0000 (11:42 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 18 Jan 2019 09:58:02 +0000 (10:58 +0100)
* ProducerOptions written
* very basic client implementation
* added vavr dependency so it's easier to handle Java

Issue-ID: DCAEGEN2-1098
Change-Id: I680948c61174f60cd78c8ee39b6f92419f913d36
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
21 files changed:
pom.xml
services/hv-ves-client/producer/api/pom.xml
services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/FactoryLoader.java
services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducerFactory.java
services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java [new file with mode: 0644]
services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java [new file with mode: 0644]
services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java [moved from services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/ProducerOptions.java with 57% similarity]
services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java [new file with mode: 0644]
services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java [new file with mode: 0644]
services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java [new file with mode: 0644]
services/hv-ves-client/producer/api/src/test/resources/password.txt [new file with mode: 0644]
services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java
services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
services/hv-ves-client/producer/ct/src/test/resources/client.p12 [new file with mode: 0644]
services/hv-ves-client/producer/ct/src/test/resources/client.pass [new file with mode: 0644]
services/hv-ves-client/producer/ct/src/test/resources/trust.p12 [new file with mode: 0644]
services/hv-ves-client/producer/ct/src/test/resources/trust.pass [new file with mode: 0644]
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index fe2684e..038fc31 100644 (file)
--- a/pom.xml
+++ b/pom.xml
         <artifactId>value</artifactId>
         <version>${immutables.version}</version>
       </dependency>
+      <dependency>
+        <groupId>io.vavr</groupId>
+        <artifactId>vavr</artifactId>
+        <version>0.9.3</version>
+      </dependency>
       <dependency>
         <groupId>org.immutables</groupId>
         <artifactId>gson</artifactId>
index 44e15c9..1804b16 100644 (file)
             <groupId>org.reactivestreams</groupId>
             <artifactId>reactive-streams</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.vavr</groupId>
+            <artifactId>vavr</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.immutables</groupId>
             <artifactId>value</artifactId>
index f90867d..a7ad836 100644 (file)
@@ -19,7 +19,7 @@
  */
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
 
-import java.util.Iterator;
+import io.vavr.collection.Stream;
 import java.util.ServiceLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,17 +35,12 @@ final class FactoryLoader {
     private static final Logger LOGGER = LoggerFactory.getLogger(FactoryLoader.class);
 
     static <T> T findInstance(Class<T> clazz) {
-        Iterator<T> instances = ServiceLoader.load(clazz).iterator();
-        if (instances.hasNext()) {
-            final T head = instances.next();
-            if (instances.hasNext()) {
-                LOGGER.warn("Found more than one implementation of {} on the class path. Using {}.",
-                        clazz.getSimpleName(), head.getClass().getName());
-            }
-            return head;
-        } else {
-            throw new IllegalStateException(
-                    "No " + clazz.getSimpleName() + " instances were configured. Are you sure you have runtime dependency on an implementation module?");
-        }
+        return Stream.ofAll(ServiceLoader.load(clazz))
+                .headOption()
+                .peek(head -> LOGGER.info(
+                        " Using {} as a {} implementation.", head.getClass().getName(), clazz.getSimpleName()))
+                .getOrElseThrow(() -> new IllegalStateException(
+                        "No " + clazz.getSimpleName() + " instances were configured. "
+                                + "Are you sure you have runtime dependency on an implementation module?"));
     }
 }
index 43cd3fe..3359e54 100644 (file)
@@ -20,6 +20,7 @@
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
 
 import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import org.reactivestreams.Publisher;
 
@@ -32,7 +33,7 @@ import org.reactivestreams.Publisher;
  * <p>Sample usage with <a href="https://projectreactor.io/">Project Reactor</a>:</p>
  *
  * <pre>
- *     ProducerOptions options = {@link ImmutableProducerOptions}.builder(). ... .build()
+ *     ProducerOptions options = ImmutableProducerOptions.builder(). ... .build()
  *     HvVesProducer hvVes = {@link HvVesProducerFactory}.create(options);
  *
  *     Flux.just(msg1, msg2, msg3)
@@ -41,6 +42,7 @@ import org.reactivestreams.Publisher;
  * </pre>
  *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions
  * @since 1.1.1
  */
 @FunctionalInterface
@@ -51,6 +53,9 @@ public interface HvVesProducer {
      * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in
      * case of any problem with sending the messages.
      *
+     * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once
+     * feeding it with a stream of consecutive events.
+     *
      * @param messages source of the messages to be sent
      * @return empty publisher which completes after messages are sent or error occurs
      * @since 1.1.1
index 3b3f91a..1e28fbd 100644 (file)
@@ -20,6 +20,7 @@
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
 
 import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions;
 
 /**
  * <p>
@@ -32,12 +33,13 @@ import org.jetbrains.annotations.NotNull;
  *
  * <pre>
  *     {@link HvVesProducer} producer = HvVesProducerFactory.create(
- *          {@link ImmutableProducerOptions}.builder().
+ *          ImmutableProducerOptions.builder().
  *              ...
  *              .build())
  * </pre>
  *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions
  * @since 1.1.1
  */
 public abstract class HvVesProducerFactory {
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Password.java
new file mode 100644 (file)
index 0000000..79ae32a
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import io.vavr.CheckedFunction1;
+import io.vavr.Function1;
+import io.vavr.control.Try;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Simple password representation.
+ *
+ * A password can be used only once. After it the corresponding memory is zeroed.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.1
+ */
+public class Password {
+
+    private char[] value;
+
+    public Password(@NotNull char[] value) {
+        this.value = value;
+    }
+
+    /**
+     * Consume the password.
+     *
+     * After consumption following uses of this method will return Failure(GeneralSecurityException).
+     *
+     * @param user of the password
+     */
+    public <T> Try<T> use(Function1<char[], Try<T>> user) {
+        if (value == null)
+            return Try.failure(new GeneralSecurityException("Password had been already used so it is in cleared state"));
+
+        try {
+            return user.apply(value);
+        } finally {
+            clear();
+        }
+    }
+
+    public <T> Try<T> useChecked(CheckedFunction1<char[], T> user) {
+        return use(CheckedFunction1.liftTry(user));
+    }
+
+    public void clear() {
+        Arrays.fill(value, (char) 0);
+        value = null;
+    }
+}
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/Passwords.java
new file mode 100644 (file)
index 0000000..cbadfea
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import io.vavr.control.Try;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Utility functions for loading passwords.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.1
+ */
+public final class Passwords {
+
+    private Passwords() {
+    }
+
+    public static @NotNull Try<Password> fromFile(File file) {
+        return fromPath(file.toPath());
+    }
+
+    public static @NotNull Try<Password> fromPath(Path path) {
+        return Try.of(() -> {
+            final byte[] bytes = Files.readAllBytes(path);
+            final CharBuffer password = decodeChars(bytes);
+            final char[] result = convertToCharArray(password);
+            return new Password(result);
+        });
+    }
+
+    public static @NotNull Try<Password> fromResource(String resource) {
+        return Try.of(() -> Paths.get(Passwords.class.getResource(resource).toURI()))
+                .flatMap(Passwords::fromPath);
+    }
+
+    private static @NotNull CharBuffer decodeChars(byte[] bytes) {
+        try {
+            return Charset.defaultCharset().decode(ByteBuffer.wrap(bytes));
+        } finally {
+            Arrays.fill(bytes, (byte) 0);
+        }
+    }
+
+    private static char[] convertToCharArray(CharBuffer password) {
+        try {
+            final char[] result = new char[password.limit()];
+            password.get(result);
+            return result;
+        } finally {
+            password.flip();
+            clearBuffer(password);
+        }
+    }
+
+    private static void clearBuffer(CharBuffer password) {
+        while (password.remaining() > 0) {
+            password.put((char) 0);
+        }
+    }
+}
@@ -1,34 +1,35 @@
 /*
- * ============LICENSE_START=======================================================
+ * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
- * ================================================================================
+ * =========================================================
  * Copyright (C) 2019 Nokia. All rights reserved.
- * ================================================================================
+ * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
 
+import io.vavr.collection.Set;
 import java.net.InetSocketAddress;
-import java.util.Set;
 import org.immutables.value.Value;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.1
  */
 @Value.Immutable
-@Value.Style(depluralize = true, depluralizeDictionary = "address:addresses")
 public interface ProducerOptions {
 
     /**
@@ -37,5 +38,23 @@ public interface ProducerOptions {
      * @return configured collector endpoints
      * @since 1.1.1
      */
+    @NotNull
     Set<InetSocketAddress> collectorAddresses();
+
+    /**
+     * Security keys definition used when connecting to the collector.
+
+     *
+     * @return security keys definition or null when plain TCP sockets are to be used.
+     * @since 1.1.1
+     */
+    @Nullable
+    SecurityKeys securityKeys();
+
+    @Value.Check
+    default void validate() {
+        if (collectorAddresses().isEmpty()) {
+            throw new IllegalArgumentException("address list cannot be empty");
+        }
+    }
 }
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/SecurityKeys.java
new file mode 100644 (file)
index 0000000..66af32f
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import java.nio.file.Path;
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.1
+ */
+@Value.Immutable
+public interface SecurityKeys {
+    Path keyStore();
+    Password keyStorePassword();
+
+    Path trustStore();
+    Password trustStorePassword();
+}
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordTest.java
new file mode 100644 (file)
index 0000000..fbfeb5d
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+import io.vavr.collection.Array;
+import io.vavr.control.Try;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ */
+class PasswordTest {
+
+    @Test
+    void use_shouldInvokeConsumerWithStoredPassword() {
+        // given
+        final String password = "hej ho";
+        final Password cut = new Password(password.toCharArray());
+
+        // when
+        String result = cut.useChecked(String::new).get();
+
+        // then
+        assertThat(result).isEqualTo(password);
+    }
+
+    @Test
+    void use_shouldClearPasswordAfterUse() {
+        // given
+        final char[] passwordChars = "hej ho".toCharArray();
+        final Password cut = new Password(passwordChars);
+
+        // when
+        useThePassword(cut);
+
+        // then
+        assertAllCharsAreNull(passwordChars);
+    }
+
+    @Test
+    void use_shouldFail_whenItWasAlreadyCalled() {
+        // given
+        final Password cut = new Password("ala ma kota".toCharArray());
+
+        // when & then
+        useThePassword(cut).get();
+
+        assertThatExceptionOfType(GeneralSecurityException.class).isThrownBy(() ->
+                useThePassword(cut).get());
+    }
+
+    @Test
+    void use_shouldFail_whenItWasCleared() {
+        // given
+        final Password cut = new Password("ala ma kota".toCharArray());
+
+        // when & then
+        cut.clear();
+
+        assertThatExceptionOfType(GeneralSecurityException.class).isThrownBy(() ->
+                useThePassword(cut).get());
+    }
+
+    @Test
+    void clear_shouldClearThePassword() {
+        // given
+        final char[] passwordChars = "hej ho".toCharArray();
+        final Password cut = new Password(passwordChars);
+
+        // when
+        cut.clear();
+
+        // then
+        assertAllCharsAreNull(passwordChars);
+    }
+
+    private Try<Object> useThePassword(Password cut) {
+        return cut.use((pass) -> Try.success(42));
+    }
+
+    private void assertAllCharsAreNull(char[] passwordChars) {
+        assertThat(Array.ofAll(passwordChars).forAll(ch -> ch == '\0'))
+                .describedAs("all characters in " + Arrays.toString(passwordChars) + " should be == '\\0'")
+                .isTrue();
+    }
+}
\ No newline at end of file
diff --git a/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java b/services/hv-ves-client/producer/api/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PasswordsTest.java
new file mode 100644 (file)
index 0000000..9f91afb
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.fail;
+
+import io.vavr.control.Try;
+import java.io.File;
+import java.net.URISyntaxException;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since January 2019
+ */
+class PasswordsTest {
+
+    @Test
+    void fromFile() {
+        // given
+        final File file = new File("./src/test/resources/password.txt");
+
+        // when
+        final Try<Password> result = Passwords.fromFile(file);
+
+        // then
+        assertSuccessful(result);
+        assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line");
+    }
+
+    @Test
+    void fromPath() throws URISyntaxException {
+        // given
+        final Path path = Paths.get(PasswordsTest.class.getResource("/password.txt").toURI());
+
+        // when
+        final Try<Password> result = Passwords.fromPath(path);
+
+        // then
+        assertSuccessful(result);
+        assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line");
+    }
+
+    @Test
+    void fromPath_shouldFail_whenNotFound() {
+        // given
+        final Path path = Paths.get("/", UUID.randomUUID().toString());
+
+        // when
+        final Try<Password> result = Passwords.fromPath(path);
+
+        // then
+        assertThat(result.isFailure()).describedAs("Try.failure?").isTrue();
+        assertThat(result.getCause()).isInstanceOf(NoSuchFileException.class);
+    }
+
+    @Test
+    void fromResource() {
+        // given
+        final String resource = "/password.txt";
+
+        // when
+        final Try<Password> result = Passwords.fromResource(resource);
+
+        // then
+        assertSuccessful(result);
+        assertThat(extractPassword(result)).isEqualTo("ja baczewski\n2nd line");
+    }
+
+    private void assertSuccessful(Try<Password> result) {
+        assertThat(result.isSuccess()).describedAs("Try.success?").isTrue();
+    }
+
+    private String extractPassword(Try<Password> result) {
+        return result.flatMap(pass -> pass.useChecked(String::new)).get();
+    }
+}
\ No newline at end of file
diff --git a/services/hv-ves-client/producer/api/src/test/resources/password.txt b/services/hv-ves-client/producer/api/src/test/resources/password.txt
new file mode 100644 (file)
index 0000000..93e4a00
--- /dev/null
@@ -0,0 +1,2 @@
+ja baczewski
+2nd line
\ No newline at end of file
index 0aa51aa..213e976 100644 (file)
@@ -21,11 +21,12 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.onap.ves.VesEventOuterClass.CommonEventHeader;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import reactor.core.publisher.Flux;
 
@@ -49,12 +50,17 @@ class HvVesProducerIT {
     @Test
     void todo() {
         // given
-        final Flux<VesEvent> input = Flux.just(VesEvent.getDefaultInstance());
+        final VesEvent sampleEvent = VesEvent.newBuilder()
+                .setCommonEventHeader(CommonEventHeader.newBuilder()
+                        .setDomain("dummy")
+                        .build())
+                .setEventFields(ByteString.copyFrom(new byte[]{0, 1, 2, 3}))
+                .build();
+
+        final Flux<VesEvent> input = Flux.just(sampleEvent);
 
         // when
-        // This will currently fail
-        //final ByteBuf receivedData = sut.blockingSend(input);
-        final ByteBuf receivedData = ByteBufAllocator.DEFAULT.buffer().writeByte(8);
+        final ByteBuf receivedData = sut.blockingSend(input);
 
         // then
         assertThat(receivedData.readableBytes())
index f459c98..2cc2c0b 100644 (file)
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
 
 import io.netty.buffer.ByteBuf;
-
+import io.vavr.collection.HashSet;
+import io.vavr.control.Try;
 import java.net.InetSocketAddress;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.time.Duration;
-
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ImmutableProducerOptions;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ImmutableProducerOptions.Builder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions.Builder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.Passwords;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -50,6 +53,16 @@ public class SystemUnderTestWrapper {
         this(DEFAULT_TIMEOUT);
     }
 
+    public void startSecure() {
+        start(ImmutableProducerOptions.builder()
+                .securityKeys(ImmutableSecurityKeys.builder()
+                        .keyStore(resource("/client.p12").get())
+                        .keyStorePassword(Passwords.fromResource("/client.pass").get())
+                        .trustStore(resource("/trust.p12").get())
+                        .trustStorePassword(Passwords.fromResource("/trust.pass").get())
+                        .build()));
+    }
+
     public void start() {
         start(createDefaultOptions());
     }
@@ -57,7 +70,7 @@ public class SystemUnderTestWrapper {
     public void start(ImmutableProducerOptions.Builder optionsBuilder) {
         InetSocketAddress collectorAddress = collector.start();
         cut = HvVesProducerFactory.create(
-                optionsBuilder.addCollectorAddress(collectorAddress).build());
+                optionsBuilder.collectorAddresses(HashSet.of(collectorAddress)).build());
     }
 
     public void stop() {
@@ -66,9 +79,6 @@ public class SystemUnderTestWrapper {
 
     public ByteBuf blockingSend(Flux<VesEvent> events) {
         events.transform(cut::send).subscribe();
-
-
-        Mono.from(cut.send(events)).block();
         collector.blockUntilFirstClientIsHandled(timeout);
         return collector.dataFromFirstClient();
     }
@@ -77,4 +87,8 @@ public class SystemUnderTestWrapper {
         return ImmutableProducerOptions.builder();
     }
 
+    private Try<Path> resource(String resource) {
+        return Try.of(() -> Paths.get(Passwords.class.getResource(resource).toURI()));
+    }
+
 }
diff --git a/services/hv-ves-client/producer/ct/src/test/resources/client.p12 b/services/hv-ves-client/producer/ct/src/test/resources/client.p12
new file mode 100644 (file)
index 0000000..68a0fb2
Binary files /dev/null and b/services/hv-ves-client/producer/ct/src/test/resources/client.p12 differ
diff --git a/services/hv-ves-client/producer/ct/src/test/resources/client.pass b/services/hv-ves-client/producer/ct/src/test/resources/client.pass
new file mode 100644 (file)
index 0000000..e69c2de
--- /dev/null
@@ -0,0 +1 @@
+onaponap
\ No newline at end of file
diff --git a/services/hv-ves-client/producer/ct/src/test/resources/trust.p12 b/services/hv-ves-client/producer/ct/src/test/resources/trust.p12
new file mode 100644 (file)
index 0000000..ed7f62d
Binary files /dev/null and b/services/hv-ves-client/producer/ct/src/test/resources/trust.p12 differ
diff --git a/services/hv-ves-client/producer/ct/src/test/resources/trust.pass b/services/hv-ves-client/producer/ct/src/test/resources/trust.pass
new file mode 100644 (file)
index 0000000..e69c2de
--- /dev/null
@@ -0,0 +1 @@
+onaponap
\ No newline at end of file
index f50206f..ad402f9 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
 
+import io.netty.handler.ssl.SslContext;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.ProducerOptions;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.netty.tcp.TcpClient;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  */
 public class HvVesProducerFactoryImpl extends HvVesProducerFactory {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(HvVesProducerFactoryImpl.class);
+    private final SslFactory sslFactory = new SslFactory();
+
     @Override
     protected @NotNull HvVesProducer createProducer(ProducerOptions options) {
-        return new HvVesProducerImpl();
+        TcpClient tcpClient = TcpClient.create()
+                .addressSupplier(() -> options.collectorAddresses().head());
+
+        if (options.securityKeys() == null) {
+            LOGGER.warn("Using insecure connection");
+        } else {
+            LOGGER.info("Using secure tunnel");
+            final SslContext ctx = sslFactory.createSecureContext(options.securityKeys()).get();
+            tcpClient = tcpClient.secure(ssl-> ssl.sslContext(ctx));
+        }
+
+        return new HvVesProducerImpl(tcpClient);
     }
 }
index 25128d1..15038c3 100644 (file)
@@ -19,6 +19,8 @@
  */
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
 
+import io.netty.buffer.ByteBuf;
+import java.nio.charset.StandardCharsets;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
 import org.onap.ves.VesEventOuterClass.VesEvent;
@@ -27,18 +29,39 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.netty.NettyOutbound;
+import reactor.netty.tcp.TcpClient;
 
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  */
 public class HvVesProducerImpl implements HvVesProducer {
+
     private static final Logger LOGGER = LoggerFactory.getLogger(HvVesProducerImpl.class);
+    private final TcpClient tcpClient;
+
+    HvVesProducerImpl(TcpClient tcpClient) {
+        this.tcpClient = tcpClient;
+    }
 
     @Override
     public @NotNull Mono<Void> send(Publisher<VesEvent> messages) {
-        return Flux.from(messages)
-                .doOnNext(msg -> LOGGER.info("Not-so-dummy sending: {}", msg.toString()))
-                .then();
+        return tcpClient
+                .handle((inbound, outbound) -> handle(outbound, messages))
+                .connect().then();
+    }
+
+    private Publisher<Void> handle(NettyOutbound outbound, Publisher<VesEvent> messages) {
+        final Flux<ByteBuf> encodedMessages = Flux.from(messages)
+                .map(msg -> {
+                    LOGGER.debug("Encoding VesEvent '{}'", msg);
+                    final ByteBuf encodedMessage = outbound.alloc().buffer();
+                    encodedMessage.writeCharSequence(msg.getCommonEventHeader().getDomain(), StandardCharsets.UTF_8);
+                    encodedMessage.writeByte(0x0a);
+                    return encodedMessage;
+                });
+
+        return outbound.send(encodedMessages).then();
     }
 }
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/SslFactory.java
new file mode 100644 (file)
index 0000000..4661f59
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
+
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.vavr.Tuple;
+import io.vavr.control.Try;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.Password;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.SecurityKeys;
+
+/*
+ * TODO: To be merged with org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory
+ */
+public class SslFactory {
+
+    /**
+     * Function for creating secure ssl context.
+     *
+     * @param keys - Security keys to be used
+     * @return configured SSL context
+     */
+    public Try<SslContext> createSecureContext(final SecurityKeys keys) {
+        final Try<KeyManagerFactory> keyManagerFactory =
+                keyManagerFactory(keys.keyStore(), keys.keyStorePassword());
+        final Try<TrustManagerFactory> trustManagerFactory =
+                trustManagerFactory(keys.trustStore(), keys.trustStorePassword());
+
+        return Try.success(SslContextBuilder.forClient())
+                .flatMap(ctx -> keyManagerFactory.map(ctx::keyManager))
+                .flatMap(ctx -> trustManagerFactory.map(ctx::trustManager))
+                .mapTry(SslContextBuilder::build);
+    }
+
+    private Try<KeyManagerFactory> keyManagerFactory(Path path, Password password) {
+        return password.useChecked(passwordChars -> {
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+            kmf.init(loadKeyStoreFromFile(path, passwordChars), passwordChars);
+            return kmf;
+        });
+    }
+
+    private Try<TrustManagerFactory> trustManagerFactory(Path path, Password password) {
+        return password.useChecked(passwordChars -> {
+            TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+            tmf.init(loadKeyStoreFromFile(path, passwordChars));
+            return tmf;
+        });
+    }
+
+    private KeyStore loadKeyStoreFromFile(Path path, char[] keyStorePassword)
+            throws GeneralSecurityException, IOException {
+        KeyStore ks = KeyStore.getInstance("pkcs12");
+        ks.load(Files.newInputStream(path, StandardOpenOption.READ), keyStorePassword);
+        return ks;
+    }
+}
\ No newline at end of file