Prioritization expiry and clean scheduler service
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / kafka / MessagePrioritizationConsumer.kt
index d7666a2..fb7cfd1 100644 (file)
@@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
 
@@ -46,6 +47,9 @@ open class MessagePrioritizationConsumer(
         KafkaStreamConsumerFunction {
         return object : KafkaStreamConsumerFunction {
 
+            val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+                ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
             override suspend fun createTopology(
                 messageConsumerProperties: MessageConsumerProperties,
                 additionalConfig: Map<String, Any>?
@@ -72,7 +76,7 @@ open class MessagePrioritizationConsumer(
                 /** To receive completed and error messages */
                 topology.addSink(
                     MessagePrioritizationConstants.SINK_OUTPUT,
-                    prioritizationConfiguration.outputTopic,
+                    kafkaConsumerConfiguration.outputTopic,
                     Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
                 )
@@ -84,7 +88,11 @@ open class MessagePrioritizationConsumer(
     }
 
     suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
-        streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector)
+
+        val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+            ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
+        streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector)
 
         // Dynamic Consumer Function to create Topology
         val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)