From 40c5abeac588ca6c13477675960c94a97dcdeb15 Mon Sep 17 00:00:00 2001 From: fkrzywka Date: Tue, 17 Jul 2018 07:54:58 +0200 Subject: [PATCH] Use Try/Option monad when decoding protobuf Closes ONAP-143 Change-Id: I33cb2d24cd5962318a6f405096db298bbdbab963 Signed-off-by: fkrzywka Issue-ID: DCAEGEN2-601 --- .../onap/dcae/collectors/veshv/impl/VesDecoder.kt | 11 +++--- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 13 ++++---- .../dcae/collectors/veshv/impl/VesDecoderTest.kt | 39 +++++++++++++++++----- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index 591a48b7..a7780109 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -19,6 +19,8 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Try +import arrow.core.Option import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventV5.VesEvent @@ -29,8 +31,9 @@ import org.onap.ves.VesEventV5.VesEvent */ internal class VesDecoder { - fun decode(bytes: ByteData): VesMessage { - val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader - return VesMessage(decodedHeader, bytes) - } + fun decode(bytes: ByteData): Option = + Try { + val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader + VesMessage(decodedHeader, bytes) + }.toOption() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 2a07b9b8..52689162 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.SynchronousSink @@ -71,19 +72,19 @@ internal class VesHvCollector( private fun decodePayload(flux: Flux): Flux = flux .map(PayloadWireFrameMessage::payload) .map(protobufDecoder::decode) - + .flatMap { omitWhenNone(it) } private fun routeMessage(flux: Flux): Flux = flux .flatMap(this::findRoute) .compose(sink::send) .doOnNext { metrics.notifyMessageSent(it.topic) } - private fun findRoute(msg: VesMessage): Mono = omitWhenNull(msg, router::findDestination) - private fun omitWhenNull(input: T, mapper: (T) -> Option): Mono = - mapper(input).fold( - { Mono.empty() }, - { Mono.just(it) }) + private fun findRoute(msg: VesMessage): Mono = omitWhenNone((router::findDestination)(msg)) + + private fun omitWhenNone(it: Option): Mono = it.fold( + { Mono.empty() }, + { Mono.just(it) }) private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index 90b34b1c..3f1f610e 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -19,10 +19,8 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Option import com.google.protobuf.ByteString -import com.google.protobuf.InvalidProtocolBufferException -import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it @@ -33,6 +31,8 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import java.nio.charset.Charset +import kotlin.test.assertTrue +import kotlin.test.fail internal object VesDecoderTest : Spek({ @@ -41,22 +41,24 @@ internal object VesDecoderTest : Spek({ val cut = VesDecoder() on("ves hv message bytes") { - val commonHeader = CommonEventHeader.getDefaultInstance() + val commonHeader = commonEventHeader() val msg = VesEvent.newBuilder() .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements")) .build() val rawMessageBytes = msg.toByteData() - it("should decode only header and pass it on along with raw message") { val expectedMessage = VesMessage( commonHeader, rawMessageBytes ) - assertThat(cut.decode(rawMessageBytes)).isEqualTo(expectedMessage) - + assertTrue { + cut.decode(rawMessageBytes).exists { + it == expectedMessage + } + } } } @@ -64,9 +66,28 @@ internal object VesDecoderTest : Spek({ val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset())) it("should throw error") { - assertThatExceptionOfType(InvalidProtocolBufferException::class.java) - .isThrownBy { cut.decode(rawMessageBytes) } + assertFailedWithError(cut.decode(rawMessageBytes)) } } } }) + +private fun assertFailedWithError(option: Option) = + option.exists { + fail("Error expected") + } + + +private fun commonEventHeader() = + CommonEventHeader.getDefaultInstance().toBuilder() + .setDomain(CommonEventHeader.Domain.HEARTBEAT) + .setVersion("1.0") + .setEventName("xyz") + .setEventId("eventID") + .setEventName("Sample event name") + .setSourceName("Sample Source") + .setPriority(CommonEventHeader.Priority.MEDIUM) + .setStartEpochMicrosec(120034455) + .setLastEpochMicrosec(120034459) + .setSequence(1) + .build() -- 2.16.6