import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
KafkaStreamConsumerFunction {
return object : KafkaStreamConsumerFunction {
+ val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+ ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
override suspend fun createTopology(
messageConsumerProperties: MessageConsumerProperties,
additionalConfig: Map<String, Any>?
/** To receive completed and error messages */
topology.addSink(
MessagePrioritizationConstants.SINK_OUTPUT,
- prioritizationConfiguration.outputTopic,
+ kafkaConsumerConfiguration.outputTopic,
Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
)
}
suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
- streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector)
+
+ val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+ ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
+ streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector)
// Dynamic Consumer Function to create Topology
val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)