package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import io.micrometer.core.instrument.MeterRegistry
-import io.mockk.coEvery
-import io.mockk.every
-import io.mockk.mockk
-import io.mockk.spyk
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
}
}
- @Test
- fun testStartConsuming() {
- runBlocking {
- val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
-
- val streamingConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
- assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
-
- val spyStreamingConsumerService = spyk(streamingConsumerService)
- coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
- coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService, mockk()
- )
- val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
-
- // Test Topology
- val kafkaStreamConsumerFunction =
- spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
- val messageConsumerProperties = bluePrintMessageLibPropertyService
- .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
- val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
- assertNotNull(topology, "failed to get create topology")
-
- every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
- spyMessagePrioritizationConsumer.startConsuming(configuration)
- spyMessagePrioritizationConsumer.shutDown()
- }
- }
-
@Test
fun testSchedulerService() {
runBlocking {