Add send with raw payload method to HvVesProducer 92/77792/7
authorJakub Dudycz <jakub.dudycz@nokia.com>
Mon, 4 Feb 2019 14:18:28 +0000 (15:18 +0100)
committerJakub Dudycz <jakub.dudycz@nokia.com>
Fri, 8 Feb 2019 15:45:00 +0000 (16:45 +0100)
Change-Id: I430b176373c8c351105c1d10047aace63319dd7c
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1164

services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/HvVesProducer.java
services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/PayloadType.java [moved from services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java with 98% similarity]
services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/HvVesProducerIT.java
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerImpl.java
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java

index 3359e54..9e9ed39 100644 (file)
@@ -19,7 +19,9 @@
  */
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api;
 
+import java.nio.ByteBuffer;
 import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ProducerOptions;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import org.reactivestreams.Publisher;
@@ -37,7 +39,7 @@ import org.reactivestreams.Publisher;
  *     HvVesProducer hvVes = {@link HvVesProducerFactory}.create(options);
  *
  *     Flux.just(msg1, msg2, msg3)
- *          .transform(hvVes::send)
+ *          .transform(hvVes::sendRaw)
  *          .subscribe();
  * </pre>
  *
@@ -45,10 +47,10 @@ import org.reactivestreams.Publisher;
  * @see org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions
  * @since 1.1.1
  */
-@FunctionalInterface
 public interface HvVesProducer {
+
     /**
-     * Send the messages to the collector.
+     * Send ves events to the collector.
      *
      * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in
      * case of any problem with sending the messages.
@@ -56,9 +58,31 @@ public interface HvVesProducer {
      * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once
      * feeding it with a stream of consecutive events.
      *
-     * @param messages source of the messages to be sent
-     * @return empty publisher which completes after messages are sent or error occurs
+     * @param messages source of ves events to be sent
+     * @return empty publisher which completes after ves events are sent or error occurs
      * @since 1.1.1
      */
     @NotNull Publisher<Void> send(Publisher<VesEvent> messages);
+
+    /**
+     * Send the specific type of messages as raw bytes to the collector.
+     *
+     * This is more generic version of @{@link #send(Publisher)},
+     * that accepts raw payload and explicit message type.
+     *
+     * Should be used when sending messages in format different from VES Common Event Format.
+     * As currently High-Volume VES Collector supports only VesEvent messages it is recommended to use the @{@link #send(Publisher)} method directly.
+     *
+     * Returns a Publisher that completes when all the messages are sent. The returned Publisher fails with an error in
+     * case of any problem with sending the messages.
+     *
+     * Each invocation of this method will yield a new TCP connection. It is recommended to call this method only once
+     * feeding it with a stream of consecutive events.
+     *
+     * @param messages source of raw messages to be sent
+     * @param payloadType type of messages to be sent
+     * @return empty publisher which completes after messages are sent or error occurs
+     * @since 1.1.1
+     */
+    @NotNull Publisher<Void> sendRaw(Publisher<ByteBuffer> messages, PayloadType payloadType);
 }
index 746aae7..247cfad 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.ct;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import io.netty.buffer.ByteBuf;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.PayloadType;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
 import org.onap.ves.MeasDataCollectionOuterClass;
 import org.onap.ves.VesEventOuterClass.CommonEventHeader;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import reactor.core.publisher.Flux;
 
-import static org.assertj.core.api.Assertions.assertThat;
-
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  */
@@ -71,7 +71,8 @@ class HvVesProducerIT {
     }
 
     private VesEvent createSimpleVesEvent() {
-        final MeasDataCollectionOuterClass.MeasDataCollection content = MeasDataCollectionOuterClass.MeasDataCollection.newBuilder()
+        final MeasDataCollectionOuterClass.MeasDataCollection content = MeasDataCollectionOuterClass.MeasDataCollection
+                .newBuilder()
                 .addMeasInfo(MeasDataCollectionOuterClass.MeasInfo.newBuilder()
                         .addMeasValues(MeasDataCollectionOuterClass.MeasValue.newBuilder()
                                 .addMeasResults(MeasDataCollectionOuterClass.MeasResult.newBuilder()
index 05873f6..b4a209f 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
 
+import io.netty.buffer.ByteBuf;
+import java.nio.ByteBuffer;
+import java.util.function.BiFunction;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import org.reactivestreams.Publisher;
-import reactor.core.publisher.Mono;
+import reactor.netty.NettyInbound;
 import reactor.netty.NettyOutbound;
 import reactor.netty.tcp.TcpClient;
 
@@ -36,23 +40,36 @@ public class HvVesProducerImpl implements HvVesProducer {
     private final TcpClient tcpClient;
     private final ProducerCore producerCore;
 
-
     HvVesProducerImpl(TcpClient tcpClient, ProducerCore producerCore) {
         this.tcpClient = tcpClient;
         this.producerCore = producerCore;
     }
 
     @Override
-    public @NotNull Mono<Void> send(Publisher<VesEvent> messages) {
-        return tcpClient
-            .handle((in, out) -> handle(messages, out))
-            .connect()
-            .then();
+    public @NotNull Publisher<Void> send(Publisher<VesEvent> messages) {
+        return handleConnection((in, out) -> handle(messages, out));
+    }
+
+    @Override
+    public @NotNull Publisher<Void> sendRaw(Publisher<ByteBuffer> messages, PayloadType payloadType) {
+        return handleConnection((in, out) -> handleRaw(messages, payloadType, out));
+    }
+
+    private Publisher<Void> handleConnection(
+            BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> handler) {
+        return tcpClient.handle(handler).connect().then();
     }
 
     private Publisher<Void> handle(Publisher<VesEvent> messages, NettyOutbound outbound) {
-        return outbound
-            .send(producerCore.encode(messages, outbound.alloc()))
-            .then();
+        return push(producerCore.encode(messages, outbound.alloc()), outbound);
+    }
+
+    private Publisher<Void> handleRaw(Publisher<ByteBuffer> messages, PayloadType payloadType,
+            NettyOutbound outbound) {
+        return push(producerCore.encode(messages, payloadType, outbound.alloc()), outbound);
+    }
+
+    private Publisher<Void> push(Publisher<ByteBuf> messages, NettyOutbound outbound) {
+        return outbound.send(messages).then();
     }
 }
index 49d54fe..3f06577 100644 (file)
@@ -22,6 +22,8 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.vavr.control.Try;
+import java.nio.ByteBuffer;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
@@ -45,11 +47,16 @@ public class ProducerCore {
     }
 
     public Flux<ByteBuf> encode(Publisher<VesEvent> messages, ByteBufAllocator allocator) {
-        final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion);
         final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
         return Flux.from(messages)
                 .map(protobufEncoder::encode)
-                .map(wireFrameEncoder::encode)
+                .transform(payload -> encode(payload, PayloadType.PROTOBUF, allocator));
+    }
+
+    public Flux<ByteBuf> encode(Publisher<ByteBuffer> messages, PayloadType payloadType, ByteBufAllocator allocator) {
+        final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion);
+        return Flux.from(messages)
+                .map(payload -> wireFrameEncoder.encode(payload, payloadType))
                 .map(Try::get);
     }
 }
index 5bee846..73b0945 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
 
+import java.nio.ByteBuffer;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-
 /**
  * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
  */
index 29e3347..af33f43 100644 (file)
@@ -24,14 +24,11 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.vavr.control.Try;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
+import java.nio.ByteBuffer;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-
-/**
- * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
- */
 public class WireFrameEncoder {
     private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class);
     private static final short MARKER_BYTE = 0xAA;
@@ -50,9 +47,10 @@ public class WireFrameEncoder {
         this.wireFrameVersion = wireFrameVersion;
     }
 
-    public Try<ByteBuf> encode(ByteBuffer payload) {
-        return Try.of(() -> encodeMessageAs(payload, PayloadType.PROTOBUF))
-                .onFailure((ex) -> LOGGER.warn("Failed to encode payload", ex));
+
+    public Try<ByteBuf> encode(ByteBuffer payload, PayloadType payloadType) {
+        return Try.of(() -> encodeMessageAs(payload, payloadType))
+                .onFailure(ex -> LOGGER.warn("Failed to encode payload", ex));
     }
 
     private ByteBuf encodeMessageAs(ByteBuffer payload, PayloadType payloadType) throws WTPEncodingException {
@@ -79,9 +77,9 @@ public class WireFrameEncoder {
     }
 
     private void writePayloadMessageHeaderEnding(ByteBuf encodedMessage,
-                                                 PayloadType payloadType,
-                                                 ByteBuffer payload,
-                                                 int payloadSize) {
+            PayloadType payloadType,
+            ByteBuffer payload,
+            int payloadSize) {
         encodedMessage.writeBytes(payloadType.getPayloadTypeBytes());
         encodedMessage.writeInt(payloadSize);
         encodedMessage.writeBytes(payload);
index b79b0cf..86c67f0 100644 (file)
@@ -20,8 +20,8 @@
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockingDetails;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -31,58 +31,85 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.vavr.control.Try;
+import java.nio.ByteBuffer;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
-import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import reactor.core.publisher.Flux;
 
-import java.nio.ByteBuffer;
-
 /**
  * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
  */
-public class ProducerCoreTest {
+class ProducerCoreTest {
 
     private ProducerCore producerCore;
     private EncodersFactory encodersFactoryMock;
     private WireFrameVersion wireFrameVersion;
 
     @BeforeEach
-    public void setUp() {
+    void setUp() {
         encodersFactoryMock = mock(EncodersFactory.class);
         wireFrameVersion = mock(WireFrameVersion.class);
         producerCore = new ProducerCore(encodersFactoryMock, wireFrameVersion);
     }
 
     @Test
-    public void encode_should_encode_message_stream_to_wire_frame() {
+    void encode_should_encode_raw_message_stream_to_wire_frame() {
+        final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class);
+        final ByteBuffer payload = ByteBuffer.wrap(new byte[3]);
+        final Try<ByteBuf> wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5]));
+
+        when(wireFrameEncoder.encode(payload, PayloadType.UNDEFINED)).thenReturn(wireFrameBuffer);
+        when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion))
+                .thenReturn(wireFrameEncoder);
+
+        // given
+        final int messageStreamSize = 2;
+        final Flux<ByteBuffer> rawMessageStream = Flux.just(payload).repeat(messageStreamSize - 1);
+
+        // when
+        final ByteBuf lastMessage = producerCore
+                .encode(rawMessageStream, PayloadType.UNDEFINED, ByteBufAllocator.DEFAULT)
+                .blockLast();
+
+        // then
+        verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion);
+        verify(wireFrameEncoder, times(messageStreamSize)).encode(payload, PayloadType.UNDEFINED);
+
+        assertThat(lastMessage).isNotNull();
+        assertThat(lastMessage).isEqualTo(wireFrameBuffer.get());
+    }
+
+    @Test
+    void encode_should_encode_ves_event_stream_to_wire_frame() {
         final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class);
         final ProtobufEncoder protobufEncoder = mock(ProtobufEncoder.class);
+
+        final VesEvent vesEvent = defaultVesEvent();
         final ByteBuffer protoBuffer = ByteBuffer.wrap(new byte[3]);
         final Try<ByteBuf> wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5]));
 
-        when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer);
-        when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer);
+        when(protobufEncoder.encode(vesEvent)).thenReturn(protoBuffer);
+        when(wireFrameEncoder.encode(protoBuffer, PayloadType.PROTOBUF)).thenReturn(wireFrameBuffer);
         when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder);
-        when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)).
-                thenReturn(wireFrameEncoder);
+        when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion))
+                .thenReturn(wireFrameEncoder);
 
         // given
         final int messageStreamSize = 2;
-        final Flux<VesEvent> messages = Flux.just(defaultVesEvent()).repeat(messageStreamSize - 1);
+        final Flux<VesEvent> messages = Flux.just(vesEvent).repeat(messageStreamSize - 1);
 
         // when
         final ByteBuf lastMessage = producerCore.encode(messages, ByteBufAllocator.DEFAULT).blockLast();
 
         // then
         verify(encodersFactoryMock).createProtobufEncoder();
-        verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion);
-        verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class));
-        verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer);
+        verify(protobufEncoder, times(messageStreamSize)).encode(vesEvent);
 
         assertThat(lastMessage).isNotNull();
         assertThat(lastMessage).isEqualTo(wireFrameBuffer.get());
index d79d0dc..7352fbf 100644 (file)
@@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
 
 import java.nio.ByteBuffer;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType;
 
 public class WireFrameEncoderTest {
     private static final byte MARKER_BYTE = (byte) 0xAA;
@@ -51,7 +52,7 @@ public class WireFrameEncoderTest {
     void encode_givenNullPayload_shouldThrowEncodingException() {
         final ByteBuffer buffer = null;
 
-        Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+        Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF);
 
         assertThat(encodedBuffer.isFailure()).isTrue();
         assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class);
@@ -61,7 +62,7 @@ public class WireFrameEncoderTest {
     void encode_givenEmptyPayload_shouldThrowEncodingException() {
         final ByteBuffer buffer = ByteBuffer.allocateDirect(0);
 
-        Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+        Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF);
 
         assertThat(encodedBuffer.isFailure()).isTrue();
         assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class);
@@ -73,7 +74,7 @@ public class WireFrameEncoderTest {
         final int bufferSize = payloadBytes.length;
         final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes);
 
-        final Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+        final Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer, PayloadType.PROTOBUF);
 
         assertThat(encodedBuffer.isSuccess()).isTrue();
         final ByteBuf actualEncodedBuffer = encodedBuffer.get();
@@ -95,7 +96,7 @@ public class WireFrameEncoderTest {
         final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes);
 
         // when
-        final Try<ByteBuf> encodedBuffer = encoder.encode(buffer);
+        final Try<ByteBuf> encodedBuffer = encoder.encode(buffer, PayloadType.PROTOBUF);
 
         // then
         assertThat(encodedBuffer.isSuccess()).isTrue();