import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.Test
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DataJpaTest
)
open class MessagePrioritizationConsumerTest {
+ private val log = logger(MessagePrioritizationConsumerTest::class)
+
@Autowired
lateinit var applicationContext: ApplicationContext
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+ @Autowired
+ lateinit var messagePrioritizationService: MessagePrioritizationService
+
@Autowired
lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
}
}
+ @Test
+ fun testMessagePrioritizationService() {
+ runBlocking {
+ assertTrue(
+ ::messagePrioritizationService.isInitialized,
+ "failed to initialize messagePrioritizationService"
+ )
+
+ log.info("**************** without Correlation **************")
+ /** Checking without correlation */
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Same Group , with Correlation **************")
+ /** checking same group with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Different Type , with Correlation **************")
+ /** checking different type, with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ }
+ }
+
@Test
fun testStartConsuming() {
runBlocking {
val spyStreamingConsumerService = spyk(streamingConsumerService)
coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+ val messagePrioritizationConsumer = MessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService
+ )
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
// Test Topology