Generate VesEvents in hv-ves/message-generator
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-ves-message-generator / src / main / kotlin / org / onap / dcae / collectors / veshv / ves / message / generator / api / MessageGenerator.kt
index 076c06b..5f8638f 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 package org.onap.dcae.collectors.veshv.ves.message.generator.api
 
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-interface MessageGenerator {
-    fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage>
+abstract class MessageGenerator<K : MessageParameters, T> {
+    abstract fun createMessageFlux(parameters: K): Flux<T>
 
-    companion object {
-        const val FIXED_PAYLOAD_SIZE = 100
+    protected fun repeatMessage(message: Mono<T>, amount: Long): Flux<T> = when {
+        amount < 0 -> repeatForever(message)
+        amount == 0L -> emptyMessageStream()
+        else -> repeatNTimes(message, amount)
     }
+
+    private fun repeatForever(message: Mono<T>) = message.repeat()
+
+    private fun emptyMessageStream() = Flux.empty<T>()
+
+    private fun repeatNTimes(message: Mono<T>, amount: Long) = message.repeat(amount - 1)
 }