Merge "Align XNF simulator batch with Netty's queue size"
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 8 Jan 2019 13:33:20 +0000 (13:33 +0000)
committerGerrit Code Review <gerrit@onap.org>
Tue, 8 Jan 2019 13:33:20 +0000 (13:33 +0000)
1  2 
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt

@@@ -35,6 -35,7 +35,7 @@@ import reactor.core.publisher.Mon
  import reactor.core.publisher.ReplayProcessor
  import reactor.netty.NettyOutbound
  import reactor.netty.tcp.TcpClient
+ import reactor.util.concurrent.Queues.XS_BUFFER_SIZE
  
  /**
   * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@@ -43,7 -44,8 +44,7 @@@
  class VesHvClient(private val configuration: SimulatorConfiguration) {
  
      private val client: TcpClient = TcpClient.create()
 -            .host(configuration.vesHost)
 -            .port(configuration.vesPort)
 +            .addressSupplier { configuration.hvVesAddress }
              .configureSsl()
  
      private fun TcpClient.configureSsl() =
                  .handle { _, output -> handler(complete, messages, output) }
                  .connect()
                  .doOnError {
 -                    logger.info {
 -                        "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
 -                    }
 +                    logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" }
                  }
                  .subscribe {
 -                    logger.info {
 -                        "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
 -                    }
 +                    logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" }
                  }
          return complete.then()
      }
@@@ -76,7 -82,7 +77,7 @@@
          val encoder = WireFrameEncoder(allocator)
          val frames = messages
                  .map(encoder::encode)
-                 .window(MAX_BATCH_SIZE)
+                 .window(XS_BUFFER_SIZE)
  
          return nettyOutbound
                  .logConnectionClosed()
  
      companion object {
          private val logger = Logger(VesHvClient::class)
-         private const val MAX_BATCH_SIZE = 128
      }
  }