import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.vavr.control.Try;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
public Flux<ByteBuf> encode(Publisher<VesEvent> messages, ByteBufAllocator allocator) {
final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator);
- final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(allocator);
+ final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
return Flux.from(messages)
- .map(protobufEncoder::encode)
- .map(wireFrameEncoder::encode);
+ .map(protobufEncoder::encode)
+ .map(wireFrameEncoder::encode)
+ .map(Try::get);
}
}
public class EncodersFactory {
- public ProtobufEncoder createProtobufEncoder(ByteBufAllocator allocator) {
- return new ProtobufEncoder(allocator);
+ public ProtobufEncoder createProtobufEncoder() {
+ return new ProtobufEncoder();
}
public WireFrameEncoder createWireFrameEncoder(ByteBufAllocator allocator) {
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
+
+import java.nio.ByteBuffer;
+
+public enum PayloadType {
+ UNDEFINED(new byte[]{0x00, 0x00}),
+ PROTOBUF(new byte[]{0x00, 0x01});
+
+ private final byte[] payloadTypeBytes;
+
+ PayloadType(byte[] payloadTypeBytes) {
+ this.payloadTypeBytes = payloadTypeBytes;
+ }
+
+ public ByteBuffer getPayloadTypeBytes() {
+ return ByteBuffer.wrap(payloadTypeBytes).asReadOnlyBuffer();
+ }
+}
*/
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import org.onap.ves.VesEventOuterClass.VesEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+
/**
* @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
*/
public class ProtobufEncoder {
private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufEncoder.class);
- private final ByteBufAllocator allocator;
-
- public ProtobufEncoder(ByteBufAllocator allocator) {
- this.allocator = allocator;
- }
- public ByteBuf encode(VesEvent event) {
+ public ByteBuffer encode(VesEvent event) {
LOGGER.debug("Encoding VesEvent '{}'", event);
- return allocator.buffer().writeBytes(event.toByteArray());
+ return event.toByteString().asReadOnlyByteBuffer();
}
}
* limitations under the License.
* ============LICENSE_END=========================================================
*/
+
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.vavr.control.Try;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
/**
* @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
*/
public class WireFrameEncoder {
+ private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class);
+ private static final short MARKER_BYTE = 0xAA;
+ private static final short SUPPORTED_VERSION_MAJOR = 0x01;
+ private static final short SUPPORTED_VERSION_MINOR = 0x00;
+ private static final int RESERVED_BYTES_COUNT = 3;
+ private static final int HEADER_SIZE = 1 * Byte.BYTES + // marker
+ 2 * Byte.BYTES + // single byte fields (versions)
+ RESERVED_BYTES_COUNT * Byte.BYTES + // reserved bytes
+ 1 * Short.BYTES + // paylaod type
+ 1 * Integer.BYTES; // payload length
private final ByteBufAllocator allocator;
this.allocator = allocator;
}
- public ByteBuf encode(ByteBuf payload) {
- final ByteBuf encodedMessage = allocator.buffer();
- encodedMessage.writeByte(0xAA);
- encodedMessage.writeBytes(payload);
- encodedMessage.writeByte(0x0a);
+ public Try<ByteBuf> encode(ByteBuffer payload) {
+ return Try.of(() -> encodeMessageAs(payload, PayloadType.PROTOBUF))
+ .onFailure((ex) -> LOGGER.warn("Failed to encode payload", ex));
+ }
+
+ private ByteBuf encodeMessageAs(ByteBuffer payload, PayloadType payloadType) throws WTPEncodingException {
+ if (payload == null) {
+ throw new WTPEncodingException("Payload is null");
+ }
+
+ final int payloadSize = payload.remaining();
+ if (payloadSize == 0) {
+ throw new WTPEncodingException("Payload is empty");
+ }
+
+ final ByteBuf encodedMessage = allocator.buffer(HEADER_SIZE + payloadSize);
+ writeBasicWTPFrameHeaderBeginning(encodedMessage);
+ writePayloadMessageHeaderEnding(encodedMessage, payloadType, payload, payloadSize);
return encodedMessage;
}
+
+ private void writeBasicWTPFrameHeaderBeginning(ByteBuf encodedMessage) {
+ encodedMessage.writeByte(MARKER_BYTE);
+ encodedMessage.writeByte(SUPPORTED_VERSION_MAJOR);
+ encodedMessage.writeByte(SUPPORTED_VERSION_MINOR);
+ encodedMessage.writeZero(RESERVED_BYTES_COUNT);
+ }
+
+ private void writePayloadMessageHeaderEnding(ByteBuf encodedMessage,
+ PayloadType payloadType,
+ ByteBuffer payload,
+ int payloadSize) {
+ encodedMessage.writeBytes(payloadType.getPayloadTypeBytes());
+ encodedMessage.writeInt(payloadSize);
+ encodedMessage.writeBytes(payload);
+ }
}
+
+
+class WTPEncodingException extends RuntimeException {
+ WTPEncodingException(String message) {
+ super(message);
+ }
+}
\ No newline at end of file
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.vavr.control.Try;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
import org.onap.ves.VesEventOuterClass.VesEvent;
import reactor.core.publisher.Flux;
+import java.nio.ByteBuffer;
+
/**
* @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
*/
public void encode_should_encode_message_stream_to_wire_frame() {
final WireFrameEncoder wireFrameEncoder = mock(WireFrameEncoder.class);
final ProtobufEncoder protobufEncoder = mock(ProtobufEncoder.class);
- final ByteBuf protoBuffer = Unpooled.copiedBuffer(new byte[3]);
- final ByteBuf wireFrameBuffer = Unpooled.copiedBuffer(new byte[5]);
+ final ByteBuffer protoBuffer = ByteBuffer.wrap(new byte[3]);
+ final Try<ByteBuf> wireFrameBuffer = Try.success(Unpooled.copiedBuffer(new byte[5]));
when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer);
when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer);
- when(encodersFactoryMock.createProtobufEncoder(ByteBufAllocator.DEFAULT)).thenReturn(protobufEncoder);
+ when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder);
when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT)).thenReturn(wireFrameEncoder);
// given
final ByteBuf lastMessage = producerCore.encode(messages, ByteBufAllocator.DEFAULT).blockLast();
// then
- verify(encodersFactoryMock).createProtobufEncoder(ByteBufAllocator.DEFAULT);
+ verify(encodersFactoryMock).createProtobufEncoder();
verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT);
verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class));
verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer);
assertThat(lastMessage).isNotNull();
- assertThat(lastMessage).isEqualTo(wireFrameBuffer);
+ assertThat(lastMessage).isEqualTo(wireFrameBuffer.get());
}
}
* limitations under the License.
* ============LICENSE_END=========================================================
*/
+
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
import static org.assertj.core.api.Assertions.assertThat;
@Test
public void factory_methods_should_create_non_null_encoders_objects() {
// when
- final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder(ByteBufAllocator.DEFAULT);
+ final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(ByteBufAllocator.DEFAULT);
// then
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.utils.VesEvents;
import org.onap.ves.VesEventOuterClass.VesEvent;
+import java.nio.ByteBuffer;
+
public class ProtobufEncoderTest {
- private final ProtobufEncoder protobufEncoder = new ProtobufEncoder(ByteBufAllocator.DEFAULT);
+ private final ProtobufEncoder protobufEncoder = new ProtobufEncoder();
@Test
void todo() {
final VesEvent message = VesEvents.defaultVesEvent();
// when
- final ByteBuf encodedMessage = protobufEncoder.encode(message);
+ final ByteBuffer encodedMessage = protobufEncoder.encode(message);
// then
- assertThat(encodedMessage.readableBytes()).isGreaterThan(0);
+ assertThat(encodedMessage.remaining()).isGreaterThan(0);
}
}
* limitations under the License.
* ============LICENSE_END=========================================================
*/
+
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
import static org.assertj.core.api.Assertions.assertThat;
+import io.vavr.control.Try;
+import org.junit.jupiter.api.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
public class WireFrameEncoderTest {
+ private static final byte MARKER_BYTE = (byte) 0xAA;
+ private static final byte SUPPORTED_VERSION_MAJOR = (byte) 0x01;
+ private static final byte SUPPORTED_VERSION_MINOR = (byte) 0x00;
+ private static final int RESERVED_BYTES_COUNT = 3;
+ private static final int HEADER_SIZE = 1 * Byte.BYTES + // marker
+ 2 * Byte.BYTES + // single byte fields (versions)
+ RESERVED_BYTES_COUNT * java.lang.Byte.BYTES + // reserved bytes
+ 1 * Short.BYTES + // paylaod type
+ 1 * Integer.BYTES; // payload length
private final WireFrameEncoder wireFrameEncoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT);
@Test
- void todo() {
- // given
- final ByteBuf buffer = Unpooled.buffer(0);
+ void encode_givenNullPayload_shouldThrowEncodingException() {
+ final ByteBuffer buffer = null;
- // when
- final ByteBuf encodedBuffer = wireFrameEncoder.encode(buffer);
+ Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
- // then
- assertThat(encodedBuffer.readableBytes()).isGreaterThan(0);
+ assertThat(encodedBuffer.isFailure()).isTrue();
+ assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class);
}
+ @Test
+ void encode_givenEmptyPayload_shouldThrowEncodingException() {
+ final ByteBuffer buffer = ByteBuffer.allocateDirect(0);
+
+ Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+
+ assertThat(encodedBuffer.isFailure()).isTrue();
+ assertThat(encodedBuffer.getCause()).isInstanceOf(WTPEncodingException.class);
+ }
+
+ @Test
+ void encode_givenSomePayloadBytes_shouldCreateValidGPBFrameWithPayloadAtTheEnd() {
+ final byte[] payloadBytes = new byte[]{0x1A, 0x2B, 0x3C};
+ final int bufferSize = payloadBytes.length;
+ final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes);
+
+ final Try<ByteBuf> encodedBuffer = wireFrameEncoder.encode(buffer);
+
+ assertThat(encodedBuffer.isSuccess()).isTrue();
+ final ByteBuf actualEncodedBuffer = encodedBuffer.get();
+ assertBufferSizeIs(actualEncodedBuffer, HEADER_SIZE + bufferSize);
+ assertValidHeaderBeggining(actualEncodedBuffer);
+ skipReservedBytes(actualEncodedBuffer);
+ assertNextBytesAreInOrder(actualEncodedBuffer, (byte) 0x00, (byte) 0x01);
+ assertNextBytesAreInOrder(actualEncodedBuffer, intToBytes(bufferSize));
+ assertNextBytesAreInOrder(actualEncodedBuffer, payloadBytes);
+ assertAllBytesVerified(actualEncodedBuffer);
+ }
+
+ private void assertNextBytesAreInOrder(ByteBuf encodedBuffer, byte... bytes) {
+ for (int i = 0; i < bytes.length; i++) {
+ assertThat(encodedBuffer.readByte())
+ .describedAs("byte in " + (i + 1) + " assertion")
+ .isEqualTo(bytes[i]);
+ }
+ }
+
+ private void assertValidHeaderBeggining(ByteBuf encodedBuffer) {
+ assertNextBytesAreInOrder(encodedBuffer,
+ MARKER_BYTE,
+ SUPPORTED_VERSION_MAJOR,
+ SUPPORTED_VERSION_MINOR);
+ }
+
+ private void assertBufferSizeIs(ByteBuf encodedBuffer, int headerSize) {
+ assertThat(encodedBuffer.readableBytes()).describedAs("buffer's readable bytes").isEqualTo(headerSize);
+ }
+
+ private void skipReservedBytes(ByteBuf encodedBuffer) {
+ encodedBuffer.readBytes(RESERVED_BYTES_COUNT);
+ }
+
+ private void assertAllBytesVerified(ByteBuf encodedBuffer) {
+ assertThat(encodedBuffer.readableBytes())
+ .describedAs("all bytes should've been asserted")
+ .isEqualTo(0);
+ }
+
+ private byte[] intToBytes(int integer) {
+ return ByteBuffer.allocate(4).putInt(integer).array();
+ }
}