/*
* Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-import io.mockk.coEvery
-import io.mockk.every
-import io.mockk.mockk
-import io.mockk.spyk
+import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
+import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.context.ApplicationContext
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
@Autowired
lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
+ @MockBean
+ lateinit var meterRegistry: MeterRegistry
+
@Before
fun setup() {
BluePrintDependencyService.inject(applicationContext)
}
}
- @Test
- fun testStartConsuming() {
- runBlocking {
- val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
-
- val streamingConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
- assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
-
- val spyStreamingConsumerService = spyk(streamingConsumerService)
- coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
- coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService, mockk()
- )
- val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
-
- // Test Topology
- val kafkaStreamConsumerFunction =
- spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
- val messageConsumerProperties = bluePrintMessageLibPropertyService
- .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
- val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
- assertNotNull(topology, "failed to get create topology")
-
- every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
- spyMessagePrioritizationConsumer.startConsuming(configuration)
- spyMessagePrioritizationConsumer.shutDown()
- }
- }
-
@Test
fun testSchedulerService() {
runBlocking {