import io.mockk.coEvery
import io.mockk.every
+import io.mockk.mockk
import io.mockk.spyk
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.Test
import kotlin.test.assertNotNull
-import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DataJpaTest
@DirtiesContext
@ContextConfiguration(
- classes = [BluePrintMessageLibConfiguration::class,
+ classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
)
@TestPropertySource(
properties =
[
- "spring.jpa.show-sql=true",
- "spring.jpa.properties.hibernate.show_sql=true",
+ "spring.jpa.show-sql=false",
+ "spring.jpa.properties.hibernate.show_sql=false",
"spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
"blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
// To send initial test message
"blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
"blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+
+ "blueprintsprocessor.nats.cds-controller.type=token-auth",
+ "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+ "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
]
)
open class MessagePrioritizationConsumerTest {
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@Autowired
- lateinit var messagePrioritizationService: MessagePrioritizationService
-
- @Autowired
- lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService
+ lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
@Autowired
- lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+ lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
@Before
fun setup() {
@Test
fun testMessagePrioritizationService() {
runBlocking {
- assertTrue(
- ::messagePrioritizationService.isInitialized,
- "failed to initialize messagePrioritizationService"
- )
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
log.info("**************** without Correlation **************")
/** Checking without correlation */
val spyStreamingConsumerService = spyk(streamingConsumerService)
coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService, mockk()
)
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
@Test
fun testSchedulerService() {
runBlocking {
- val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration()
- assertTrue(
- ::messagePrioritizationSchedulerService.isInitialized,
- "failed to initialize messagePrioritizationSchedulerService"
- )
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationSchedulerService =
+ MessagePrioritizationSchedulerService(messagePrioritizationService)
launch {
- messagePrioritizationSchedulerService.startScheduling(configuration)
+ messagePrioritizationSchedulerService.startScheduling()
}
launch {
/** To debug increase the delay time */
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
// @Test
- fun testMessagePrioritizationConsumer() {
+ fun testKafkaMessagePrioritizationConsumer() {
runBlocking {
- messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val kafkaMessagePrioritizationService =
+ SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
+ kafkaMessagePrioritizationService.setConfiguration(configuration)
+
+ val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
+ messagePrioritizationStateService,
+ kafkaMessagePrioritizationService
+ )
+
+ // Register the processor
+ BluePrintDependencyService.registerSingleton(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ defaultMessagePrioritizeProcessor
+ )
+
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService,
+ kafkaMessagePrioritizationService
+ )
+ messagePrioritizationConsumer.startConsuming(configuration)
/** Send sample message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
messagePrioritizationConsumer.shutDown()
}
}
+
+ /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
+ * Start :
+ * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
+ * */
+ // @Test
+ fun testNatsMessagePrioritizationConsumer() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
+
+ val inputSubject =
+ NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
+
+ val natsMessagePrioritizationService =
+ SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
+ natsMessagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationConsumer =
+ NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
+ messagePrioritizationConsumer.startConsuming()
+
+ /** Send sample message with every 1 sec */
+ val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
+
+ launch {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(200)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+ }
+ delay(3000)
+ messagePrioritizationConsumer.shutDown()
+ }
+ }
}