import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.assertNotNull
-
@RunWith(SpringRunner::class)
@DirtiesContext
-@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
-@TestPropertySource(properties =
-[
- "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+@ContextConfiguration(
+ classes = [BluePrintMessageLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+)
+@TestPropertySource(
+ properties =
+ [
+ "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
- "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
- "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
- "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
+ "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
+ "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
-])
+ ]
+)
class KafkaStreamsBasicAuthConsumerServiceTest {
+
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
}
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
- //@Test
+ // @Test
fun testKafkaStreamingMessageConsumer() {
runBlocking {
val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
// Dynamic Consumer Function to create Topology
val consumerFunction = object : KafkaStreamConsumerFunction {
- override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
- additionalConfig: Map<String, Any>?): Topology {
+ override suspend fun createTopology(
+ messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?
+ ): Topology {
val topology = Topology()
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
as KafkaStreamsBasicAuthConsumerProperties
// Store Buolder
val countStoreSupplier = Stores.keyValueStoreBuilder(
- Stores.persistentKeyValueStore("PriorityMessageState"),
- Serdes.String(),
- PriorityMessageSerde())
- .withLoggingEnabled(changelogConfig)
+ Stores.persistentKeyValueStore("PriorityMessageState"),
+ Serdes.String(),
+ PriorityMessageSerde()
+ )
+ .withLoggingEnabled(changelogConfig)
topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source")
topology.addStateStore(countStoreSupplier, "FirstProcessor")
- topology.addSink("SINK", "default-stream-topic-out", Serdes.String().serializer(),
- PriorityMessageSerde().serializer(), "FirstProcessor")
+ topology.addSink(
+ "SINK", "default-stream-topic-out", Serdes.String().serializer(),
+ PriorityMessageSerde().serializer(), "FirstProcessor"
+ )
return topology
}
}
/** Send message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
launch {
repeat(5) {
delay(1000)
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.toString()
- blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
- headers = headers)
+ blueprintMessageProducerService.sendMessageNB(
+ message = "this is my message($it)",
+ headers = headers
+ )
}
}
streamingConsumerService.consume(null, consumerFunction)
streamingConsumerService.shutDown()
}
}
-}
\ No newline at end of file
+}