*/
package org.onap.dcae.collectors.veshv.kafkaconsumer
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory
import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration
import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.OffsetKafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.ProcessingKafkaConsumer
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer
import org.onap.dcae.collectors.veshv.utils.process.ExitCode
private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess {
+ val kafkaConsumer = KafkaConsumer<ByteArray, ByteArray>(KafkaPropertiesFactory.create(
+ config.kafkaBootstrapServers)
+ )
+
+ runBlocking {
+ if (config.disableProcessing) {
+ OffsetKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE)
+ .start()
+ } else {
+ ProcessingKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE)
+ .start()
+ }
+ }
+
PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE)
.start().block()!!.await().block() // TODO refactor netty server logic