Use consumers in main
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-kafka-consumer / src / main / kotlin / org / onap / dcae / collectors / veshv / kafkaconsumer / main.kt
index 7e77bae..9bf4310 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.kafkaconsumer
 
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory
 import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration
 import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.OffsetKafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.ProcessingKafkaConsumer
 import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer
 import org.onap.dcae.collectors.veshv.utils.process.ExitCode
@@ -37,6 +42,20 @@ fun main(args: Array<String>): Unit =
 
 
 private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess {
+    val kafkaConsumer = KafkaConsumer<ByteArray, ByteArray>(KafkaPropertiesFactory.create(
+            config.kafkaBootstrapServers)
+    )
+
+        runBlocking {
+            if (config.disableProcessing) {
+                OffsetKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE)
+                        .start()
+            } else {
+                ProcessingKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE)
+                        .start()
+            }
+        }
+
     PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE)
             .start().block()!!.await().block() // TODO refactor netty server logic