import kotlinx.coroutines.runBlocking
import org.junit.Before
import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.Test
import kotlin.test.assertNotNull
-
+import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DataJpaTest
@DirtiesContext
-@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BlueprintPropertyConfiguration::class, BluePrintProperties::class,
- MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class])
-@TestPropertySource(properties =
-[
- "spring.jpa.show-sql=true",
- "spring.jpa.properties.hibernate.show_sql=true",
- "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
-
- "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
- "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
- "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
-
- // To send initial test message
- "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
- "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
-])
+@ContextConfiguration(
+ classes = [BluePrintMessageLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
+ MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
+)
+@TestPropertySource(
+ properties =
+ [
+ "spring.jpa.show-sql=true",
+ "spring.jpa.properties.hibernate.show_sql=true",
+ "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
+
+ "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
+ "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+
+ // To send initial test message
+ "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+ ]
+)
open class MessagePrioritizationConsumerTest {
+ private val log = logger(MessagePrioritizationConsumerTest::class)
+
@Autowired
lateinit var applicationContext: ApplicationContext
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+ @Autowired
+ lateinit var messagePrioritizationService: MessagePrioritizationService
+
+ @Autowired
+ lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+
@Before
fun setup() {
BluePrintDependencyService.inject(applicationContext)
assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
- .instance(MessagePrioritizationStateService::class)
+ .instance(MessagePrioritizationStateService::class)
assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
}
}
+ @Test
+ fun testMessagePrioritizationService() {
+ runBlocking {
+ assertTrue(
+ ::messagePrioritizationService.isInitialized,
+ "failed to initialize messagePrioritizationService"
+ )
+
+ log.info("**************** without Correlation **************")
+ /** Checking without correlation */
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Same Group , with Correlation **************")
+ /** checking same group with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Different Type , with Correlation **************")
+ /** checking different type, with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ }
+ }
+
@Test
fun testStartConsuming() {
runBlocking {
val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
val streamingConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService(configuration.inputTopicSelector)
+ .blueprintMessageConsumerService(configuration.inputTopicSelector)
assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
val spyStreamingConsumerService = spyk(streamingConsumerService)
coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+ val messagePrioritizationConsumer = MessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService
+ )
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
-
// Test Topology
- val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
+ val kafkaStreamConsumerFunction =
+ spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
val messageConsumerProperties = bluePrintMessageLibPropertyService
- .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
+ .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
assertNotNull(topology, "failed to get create topology")
}
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
- //@Test
+ // @Test
fun testMessagePrioritizationConsumer() {
runBlocking {
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
/** Send sample message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
launch {
- MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
delay(100)
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.id
- blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
- headers = headers)
+ blueprintMessageProducerService.sendMessageNB(
+ message = it.asJsonString(false),
+ headers = headers
+ )
}
MessagePrioritizationSample
- .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
- .forEach {
- delay(100)
- val headers: MutableMap<String, String> = hashMapOf()
- headers["id"] = it.id
- blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
- headers = headers)
- }
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(
+ message = it.asJsonString(false),
+ headers = headers
+ )
+ }
MessagePrioritizationSample
- .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
- .forEach {
- delay(2000)
- val headers: MutableMap<String, String> = hashMapOf()
- headers["id"] = it.id
- blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
- headers = headers)
- }
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(2000)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(
+ message = it.asJsonString(false),
+ headers = headers
+ )
+ }
}
delay(10000)
messagePrioritizationConsumer.shutDown()
}
}
-}
\ No newline at end of file
+}