import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
open class KafkaMessagePrioritizationConsumer(
- private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+ private val bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService,
private val kafkaMessagePrioritizationService: MessagePrioritizationService
) {
return object : KafkaStreamConsumerFunction {
val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
- ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+ ?: throw BlueprintProcessorException("failed to get kafka consumer configuration")
override suspend fun createTopology(
messageConsumerProperties: MessageConsumerProperties,
suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
- ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+ ?: throw BlueprintProcessorException("failed to get kafka consumer configuration")
streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector)