Fix shutting down when new config received bug
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / wire / WireChunkDecoder.kt
index b735138..ca9d28a 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.wire
 
-import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
 import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Flux.defer
 import reactor.core.publisher.SynchronousSink
@@ -63,26 +62,22 @@ internal class WireChunkDecoder(
     private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
         decoder.decodeFirst(streamBuffer)
                 .fold(onError(next), onSuccess(next))
-                .unsafeRunSync()
     }
 
-    private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+    private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> Unit = { err ->
         when (err) {
-            is InvalidWireFrame -> IO {
+            is InvalidWireFrame ->
                 next.error(WireFrameException(err))
-            }
-            is MissingWireFrameBytes -> IO {
+            is MissingWireFrameBytes -> {
                 logEndOfData()
                 next.complete()
             }
         }
     }
 
-    private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
-        IO {
-            logDecodedWireMessage(frame)
-            next.next(frame)
-        }
+    private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> Unit = { frame ->
+        logDecodedWireMessage(frame)
+        next.next(frame)
     }
 
     private fun logIncomingMessage(wire: ByteBuf) {