}
/** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
- suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?,
- consumerFunction: ConsumerFunction) {
+ suspend fun consume(
+ topics: List<String>,
+ additionalConfig: Map<String, Any>?,
+ consumerFunction: ConsumerFunction
+ ) {
throw BluePrintProcessorException("Not Implemented")
}
/** close the channel, consumer and other resources */
suspend fun shutDown()
}
+
/** Consumer dynamic implementation interface */
interface KafkaConsumerRecordsFunction : ConsumerFunction {
- suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>,
- consumerRecords: ConsumerRecords<*, *>)
+
+ suspend fun invoke(
+ messageConsumerProperties: MessageConsumerProperties,
+ consumer: Consumer<*, *>,
+ consumerRecords: ConsumerRecords<*, *>
+ )
}
interface KafkaStreamConsumerFunction : ConsumerFunction {
- suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
- additionalConfig: Map<String, Any>?): Topology
-}
\ No newline at end of file
+ suspend fun createTopology(
+ messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?
+ ): Topology
+}