Encode outgoing packages using WTP 38/77438/4
authorFilip Krzywka <filip.krzywka@nokia.com>
Mon, 28 Jan 2019 09:24:55 +0000 (10:24 +0100)
committerFilip Krzywka <filip.krzywka@nokia.com>
Mon, 28 Jan 2019 13:56:35 +0000 (14:56 +0100)
Change-Id: I5a2c14846168bcc1a77bec6a96ecdd3114c5016a
Issue-ID: DCAEGEN2-1069
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java [new file with mode: 0644]
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoder.java
services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java
services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/ProtobufEncoderTest.java
services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java

index baa6d6b..3000e3d 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.vavr.control.Try;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
@@ -42,9 +43,10 @@ public class ProducerCore {
 
     public Flux<ByteBuf> encode(Publisher<VesEvent> messages, ByteBufAllocator allocator) {
         final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator);
-        final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(allocator);
+        final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
         return Flux.from(messages)
-            .map(protobufEncoder::encode)
-            .map(wireFrameEncoder::encode);
+                .map(protobufEncoder::encode)
+                .map(wireFrameEncoder::encode)
+                .map(Try::get);
     }
 }
index 1d7b8b6..fbe8ea3 100644 (file)
@@ -23,8 +23,8 @@ import io.netty.buffer.ByteBufAllocator;
 
 public class EncodersFactory {
 
-    public ProtobufEncoder createProtobufEncoder(ByteBufAllocator allocator) {
-        return new ProtobufEncoder(allocator);
+    public ProtobufEncoder createProtobufEncoder() {
+        return new ProtobufEncoder();
     }
 
     public WireFrameEncoder createWireFrameEncoder(ByteBufAllocator allocator) {
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/PayloadType.java
new file mode 100644 (file)
index 0000000..ad10c04
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
+
+import java.nio.ByteBuffer;
+
+public enum PayloadType {
+    UNDEFINED(new byte[]{0x00, 0x00}),
+    PROTOBUF(new byte[]{0x00, 0x01});
+
+    private final byte[] payloadTypeBytes;
+
+    PayloadType(byte[] payloadTypeBytes) {
+        this.payloadTypeBytes = payloadTypeBytes;
+    }
+
+    public ByteBuffer getPayloadTypeBytes() {
+        return ByteBuffer.wrap(payloadTypeBytes).asReadOnlyBuffer();
+    }
+}
index bb861c2..305716f 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+
 /**
  * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
  */
 public class ProtobufEncoder {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufEncoder.class);
-    private final ByteBufAllocator allocator;
-
-    public ProtobufEncoder(ByteBufAllocator allocator) {
-        this.allocator = allocator;
-    }
 
-    public ByteBuf encode(VesEvent event) {
+    public ByteBuffer encode(VesEvent event) {
         LOGGER.debug("Encoding VesEvent '{}'", event);
-        return allocator.buffer().writeBytes(event.toByteArray());
+        return event.toByteString().asReadOnlyByteBuffer();
     }
 }
index a0807c6..a946cea 100644 (file)
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
+
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.vavr.control.Try;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
 
 /**
  * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
  */
 public class WireFrameEncoder {
+    private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class);
+    private static final short MARKER_BYTE = 0xAA;
+    private static final short SUPPORTED_VERSION_MAJOR = 0x01;
+    private static final short SUPPORTED_VERSION_MINOR = 0x00;
+    private static final int RESERVED_BYTES_COUNT = 3;
+    private static final int HEADER_SIZE = 1 * Byte.BYTES +         // marker
+            2 * Byte.BYTES +                                        // single byte fields (versions)
+            RESERVED_BYTES_COUNT * Byte.BYTES +                     // reserved bytes
+            1 * Short.BYTES +                                       // paylaod type
+            1 * Integer.BYTES;                                      // payload length
 
     private final ByteBufAllocator allocator;
 
@@ -33,11 +49,47 @@ public class WireFrameEncoder {
         this.allocator = allocator;
     }
 
-    public ByteBuf encode(ByteBuf payload) {
-        final ByteBuf encodedMessage = allocator.buffer();
-        encodedMessage.writeByte(0xAA);
-        encodedMessage.writeBytes(payload);
-        encodedMessage.writeByte(0x0a);
+    public Try<ByteBuf> encode(ByteBuffer payload) {
+        return Try.of(() -> encodeMessageAs(payload, PayloadType.PROTOBUF))
+                .onFailure((ex) -> LOGGER.warn("Failed to encode payload", ex));
+    }
+
+    private ByteBuf encodeMessageAs(ByteBuffer payload, PayloadType payloadType) throws WTPEncodingException {
+        if (payload == null) {
+            throw new WTPEncodingException("Payload is null");
+        }
+
+        final int payloadSize = payload.remaining();
+        if (payloadSize == 0) {
+            throw new WTPEncodingException("Payload is empty");
+        }
+
+        final ByteBuf encodedMessage = allocator.buffer(HEADER_SIZE + payloadSize);
+        writeBasicWTPFrameHeaderBeginning(encodedMessage);
+        writePayloadMessageHeaderEnding(encodedMessage, payloadType, payload, payloadSize);
         return encodedMessage;
     }
+
+    private void writeBasicWTPFrameHeaderBeginning(ByteBuf encodedMessage) {
+        encodedMessage.writeByte(MARKER_BYTE);
+        encodedMessage.writeByte(SUPPORTED_VERSION_MAJOR);
+        encodedMessage.writeByte(SUPPORTED_VERSION_MINOR);
+        encodedMessage.writeZero(RESERVED_BYTES_COUNT);
+    }
+
+    private void writePayloadMessageHeaderEnding(ByteBuf encodedMessage,
+                                                 PayloadType payloadType,
+                                                 ByteBuffer payload,
+                                                 int payloadSize) {
+        encodedMessage.writeBytes(payloadType.getPayloadTypeBytes());
+        encodedMessage.writeInt(payloadSize);
+        encodedMessage.writeBytes(payload);
+    }
 }
+
+
+class WTPEncodingException extends RuntimeException {
+    WTPEncodingException(String message) {
+        super(message);
+    }
+}
\ No newline at end of file
index c4211ff..02cc6e5 100644 (file)
@@ -30,6 +30,7 @@ import static org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.vavr.control.Try;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
@@ -38,6 +39,8 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encode
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import reactor.core.publisher.Flux;
 
+import java.nio.ByteBuffer;
+
 /**
  * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
  */
@@ -56,12 +59,12 @@ public class ProducerCoreTest {
     public void encode_should_encode_message_stream_to_wire_frame() {
         final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class);
         final ProtobufEncoder protobufEncoder = mock(ProtobufEncoder.class);
-        final ByteBuf protoBuffer = Unpooled.copiedBuffer(new byte[3]);
-        final ByteBuf wireFrameBuffer = Unpooled.copiedBuffer(new byte[5]);
+        final ByteBuffer protoBuffer = ByteBuffer.wrap(new byte[3]);
+        final Try<ByteBuf> wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5]));
 
         when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer);
         when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer);
-        when(encodersFactoryMock.createProtobufEncoder(ByteBufAllocator.DEFAULT)).thenReturn(protobufEncoder);
+        when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder);
         when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT)).thenReturn(wireFrameEncoder);
 
         // given
@@ -72,12 +75,12 @@ public class ProducerCoreTest {
         final ByteBuf lastMessage = producerCore.encode(messages, ByteBufAllocator.DEFAULT).blockLast();
 
         // then
-        verify(encodersFactoryMock).createProtobufEncoder(ByteBufAllocator.DEFAULT);
+        verify(encodersFactoryMock).createProtobufEncoder();
         verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT);
         verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class));
         verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer);
 
         assertThat(lastMessage).isNotNull();
-        assertThat(lastMessage).isEqualTo(wireFrameBuffer);
+        assertThat(lastMessage).isEqualTo(wireFrameBuffer.get());
     }
 }
index c7439ce..3065db2 100644 (file)
@@ -17,6 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
+
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -34,7 +35,7 @@ public class EncodersFactoryTest {
     @Test
     public void factory_methods_should_create_non_null_encoders_objects() {
         // when
-        final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(ByteBufAllocator.DEFAULT);
+        final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
         final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(ByteBufAllocator.DEFAULT);
 
         // then
index 042874c..ba5514e 100644 (file)
@@ -27,9 +27,11 @@ import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.utils.VesEvents;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 
+import java.nio.ByteBuffer;
+
 public class ProtobufEncoderTest {
 
-    private final ProtobufEncoder protobufEncoder = new ProtobufEncoder(ByteBufAllocator.DEFAULT);
+    private final ProtobufEncoder protobufEncoder = new ProtobufEncoder();
 
     @Test
     void todo() {
@@ -37,9 +39,9 @@ public class ProtobufEncoderTest {
         final VesEvent message = VesEvents.defaultVesEvent();
 
         // when
-        final ByteBuf encodedMessage = protobufEncoder.encode(message);
+        final ByteBuffer encodedMessage = protobufEncoder.encode(message);
 
         // then
-        assertThat(encodedMessage.readableBytes()).isGreaterThan(0);
+        assertThat(encodedMessage.remaining()).isGreaterThan(0);
     }
 }
index 97c38cf..a0a67d9 100644 (file)
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
+
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import io.vavr.control.Try;
+import org.junit.jupiter.api.Test;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
 
 public class WireFrameEncoderTest {
+    private static final byte MARKER_BYTE = (byte) 0xAA;
+    private static final byte SUPPORTED_VERSION_MAJOR = (byte) 0x01;
+    private static final byte SUPPORTED_VERSION_MINOR = (byte) 0x00;
+    private static final int RESERVED_BYTES_COUNT = 3;
+    private static final int HEADER_SIZE = 1 * Byte.BYTES +         // marker
+            2 * Byte.BYTES +                                        // single byte fields (versions)
+            RESERVED_BYTES_COUNT * java.lang.Byte.BYTES +           // reserved bytes
+            1 * Short.BYTES +                                       // paylaod type
+            1 * Integer.BYTES;                                      // payload length
 
     private final WireFrameEncoder wireFrameEncoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT);
 
     @Test
-    void todo() {
-        // given
-        final ByteBuf buffer = Unpooled.buffer(0);
+    void encode_givenNullPayload_shouldThrowEncodingException() {
+        final ByteBuffer buffer = null;
 
-        // when
-        final ByteBuf encodedBuffer = wireFrameEncoder.encode(buffer);
+        Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
 
-        // then
-        assertThat(encodedBuffer.readableBytes()).isGreaterThan(0);
+        assertThat(encodedBuffer.isFailure()).isTrue();
+        assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class);
     }
 
+    @Test
+    void encode_givenEmptyPayload_shouldThrowEncodingException() {
+        final ByteBuffer buffer = ByteBuffer.allocateDirect(0);
+
+        Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+
+        assertThat(encodedBuffer.isFailure()).isTrue();
+        assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class);
+    }
+
+    @Test
+    void encode_givenSomePayloadBytes_shouldCreateValidGPBFrameWithPayloadAtTheEnd() {
+        final byte[] payloadBytes = new byte[]{0x1A, 0x2B, 0x3C};
+        final int bufferSize = payloadBytes.length;
+        final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes);
+
+        final Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+
+        assertThat(encodedBuffer.isSuccess()).isTrue();
+        final ByteBuf actualEncodedBuffer = encodedBuffer.get();
+        assertBufferSizeIs(actualEncodedBuffer, HEADER_SIZE + bufferSize);
+        assertValidHeaderBeggining(actualEncodedBuffer);
+        skipReservedBytes(actualEncodedBuffer);
+        assertNextBytesAreInOrder(actualEncodedBuffer, (byte) 0x00, (byte) 0x01);
+        assertNextBytesAreInOrder(actualEncodedBuffer, intToBytes(bufferSize));
+        assertNextBytesAreInOrder(actualEncodedBuffer, payloadBytes);
+        assertAllBytesVerified(actualEncodedBuffer);
+    }
+
+    private void assertNextBytesAreInOrder(ByteBuf encodedBuffer, byte... bytes) {
+        for (int i = 0; i < bytes.length; i++) {
+            assertThat(encodedBuffer.readByte())
+                    .describedAs("byte in " + (i + 1) + " assertion")
+                    .isEqualTo(bytes[i]);
+        }
+    }
+
+    private void assertValidHeaderBeggining(ByteBuf encodedBuffer) {
+        assertNextBytesAreInOrder(encodedBuffer,
+                MARKER_BYTE,
+                SUPPORTED_VERSION_MAJOR,
+                SUPPORTED_VERSION_MINOR);
+    }
+
+    private void assertBufferSizeIs(ByteBuf encodedBuffer, int headerSize) {
+        assertThat(encodedBuffer.readableBytes()).describedAs("buffer's readable bytes").isEqualTo(headerSize);
+    }
+
+    private void skipReservedBytes(ByteBuf encodedBuffer) {
+        encodedBuffer.readBytes(RESERVED_BYTES_COUNT);
+    }
+
+    private void assertAllBytesVerified(ByteBuf encodedBuffer) {
+        assertThat(encodedBuffer.readableBytes())
+                .describedAs("all bytes should've been asserted")
+                .isEqualTo(0);
+    }
+
+    private byte[] intToBytes(int integer) {
+        return ByteBuffer.allocate(4).putInt(integer).array();
+    }
 }