Use Try/Option monad when decoding protobuf 39/58839/1
authorfkrzywka <filip.krzywka@nokia.com>
Tue, 17 Jul 2018 05:54:58 +0000 (07:54 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 3 Aug 2018 05:18:32 +0000 (07:18 +0200)
Closes ONAP-143

Change-Id: I33cb2d24cd5962318a6f405096db298bbdbab963
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt

index 591a48b..a778010 100644 (file)
@@ -19,6 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
+import arrow.core.Try
+import arrow.core.Option
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.ves.VesEventV5.VesEvent
@@ -29,8 +31,9 @@ import org.onap.ves.VesEventV5.VesEvent
  */
 internal class VesDecoder {
 
-    fun decode(bytes: ByteData): VesMessage {
-        val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
-        return VesMessage(decodedHeader, bytes)
-    }
+    fun decode(bytes: ByteData): Option<VesMessage> =
+            Try {
+                val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+                VesMessage(decodedHeader, bytes)
+            }.toOption()
 }
index 2a07b9b..5268916 100644 (file)
@@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.core.publisher.SynchronousSink
@@ -71,19 +72,19 @@ internal class VesHvCollector(
     private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux
             .map(PayloadWireFrameMessage::payload)
             .map(protobufDecoder::decode)
-
+            .flatMap { omitWhenNone(it) }
 
     private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
             .flatMap(this::findRoute)
             .compose(sink::send)
             .doOnNext { metrics.notifyMessageSent(it.topic) }
 
-    private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
 
-    private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
-            mapper(input).fold(
-                    { Mono.empty() },
-                    { Mono.just(it) })
+    private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
+
+    private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
+            { Mono.empty() },
+            { Mono.just(it) })
 
     private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
 
index 90b34b1..3f1f610 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl
 
+import arrow.core.Option
 import com.google.protobuf.ByteString
-import com.google.protobuf.InvalidProtocolBufferException
-import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.assertThatExceptionOfType
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
@@ -33,6 +31,8 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.ves.VesEventV5.VesEvent
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import java.nio.charset.Charset
+import kotlin.test.assertTrue
+import kotlin.test.fail
 
 
 internal object VesDecoderTest : Spek({
@@ -41,22 +41,24 @@ internal object VesDecoderTest : Spek({
         val cut = VesDecoder()
 
         on("ves hv message bytes") {
-            val commonHeader = CommonEventHeader.getDefaultInstance()
+            val commonHeader = commonEventHeader()
             val msg = VesEvent.newBuilder()
                     .setCommonEventHeader(commonHeader)
                     .setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements"))
                     .build()
             val rawMessageBytes = msg.toByteData()
 
-
             it("should decode only header and pass it on along with raw message") {
                 val expectedMessage = VesMessage(
                         commonHeader,
                         rawMessageBytes
                 )
 
-                assertThat(cut.decode(rawMessageBytes)).isEqualTo(expectedMessage)
-
+                assertTrue {
+                    cut.decode(rawMessageBytes).exists {
+                        it == expectedMessage
+                    }
+                }
             }
         }
 
@@ -64,9 +66,28 @@ internal object VesDecoderTest : Spek({
             val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
 
             it("should throw error") {
-                assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
-                        .isThrownBy { cut.decode(rawMessageBytes) }
+                assertFailedWithError(cut.decode(rawMessageBytes))
             }
         }
     }
 })
+
+private fun <A> assertFailedWithError(option: Option<A>) =
+        option.exists {
+            fail("Error expected")
+        }
+
+
+private fun commonEventHeader() =
+        CommonEventHeader.getDefaultInstance().toBuilder()
+                .setDomain(CommonEventHeader.Domain.HEARTBEAT)
+                .setVersion("1.0")
+                .setEventName("xyz")
+                .setEventId("eventID")
+                .setEventName("Sample event name")
+                .setSourceName("Sample Source")
+                .setPriority(CommonEventHeader.Priority.MEDIUM)
+                .setStartEpochMicrosec(120034455)
+                .setLastEpochMicrosec(120034459)
+                .setSequence(1)
+                .build()