2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
19 import io.mockk.coEvery
22 import kotlinx.coroutines.delay
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.junit.Before
26 import org.junit.runner.RunWith
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
34 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
35 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
36 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
37 import org.onap.ccsdk.cds.controllerblueprints.core.logger
38 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
39 import org.springframework.beans.factory.annotation.Autowired
40 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
41 import org.springframework.context.ApplicationContext
42 import org.springframework.test.annotation.DirtiesContext
43 import org.springframework.test.context.ContextConfiguration
44 import org.springframework.test.context.TestPropertySource
45 import org.springframework.test.context.junit4.SpringRunner
46 import kotlin.test.Test
47 import kotlin.test.assertNotNull
48 import kotlin.test.assertTrue
50 @RunWith(SpringRunner::class)
53 @ContextConfiguration(
54 classes = [BluePrintMessageLibConfiguration::class,
55 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
56 MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
61 "spring.jpa.show-sql=true",
62 "spring.jpa.properties.hibernate.show_sql=true",
63 "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
65 "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
66 "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
67 "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
68 "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
70 // To send initial test message
71 "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
72 "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
73 "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
76 open class MessagePrioritizationConsumerTest {
78 private val log = logger(MessagePrioritizationConsumerTest::class)
81 lateinit var applicationContext: ApplicationContext
84 lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
87 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
90 lateinit var messagePrioritizationService: MessagePrioritizationService
93 lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService
96 lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
100 BluePrintDependencyService.inject(applicationContext)
104 fun testBluePrintKafkaJDBCKeyStore() {
106 assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
108 val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
109 .instance(MessagePrioritizationStateService::class)
110 assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
112 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
113 val message = messagePrioritizationService.saveMessage(it)
114 val repoResult = messagePrioritizationService.getMessage(message.id)
115 assertNotNull(repoResult, "failed to get inserted message.")
121 fun testMessagePrioritizationService() {
124 ::messagePrioritizationService.isInitialized,
125 "failed to initialize messagePrioritizationService"
128 log.info("**************** without Correlation **************")
129 /** Checking without correlation */
130 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
131 messagePrioritizationService.prioritize(it)
133 log.info("**************** Same Group , with Correlation **************")
134 /** checking same group with correlation */
135 MessagePrioritizationSample
136 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
139 messagePrioritizationService.prioritize(it)
141 log.info("**************** Different Type , with Correlation **************")
142 /** checking different type, with correlation */
143 MessagePrioritizationSample
144 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
147 messagePrioritizationService.prioritize(it)
153 fun testStartConsuming() {
155 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
157 val streamingConsumerService = bluePrintMessageLibPropertyService
158 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
159 assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
161 val spyStreamingConsumerService = spyk(streamingConsumerService)
162 coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
163 coEvery { spyStreamingConsumerService.shutDown() } returns Unit
164 val messagePrioritizationConsumer = MessagePrioritizationConsumer(
165 bluePrintMessageLibPropertyService
167 val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
170 val kafkaStreamConsumerFunction =
171 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
172 val messageConsumerProperties = bluePrintMessageLibPropertyService
173 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
174 val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
175 assertNotNull(topology, "failed to get create topology")
177 every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
178 spyMessagePrioritizationConsumer.startConsuming(configuration)
179 spyMessagePrioritizationConsumer.shutDown()
184 fun testSchedulerService() {
186 val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration()
188 ::messagePrioritizationSchedulerService.isInitialized,
189 "failed to initialize messagePrioritizationSchedulerService"
192 messagePrioritizationSchedulerService.startScheduling(configuration)
195 /** To debug increase the delay time */
197 messagePrioritizationSchedulerService.shutdownScheduling(configuration)
202 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
204 fun testMessagePrioritizationConsumer() {
206 messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
208 /** Send sample message with every 1 sec */
209 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
210 .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
212 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
214 val headers: MutableMap<String, String> = hashMapOf()
215 headers["id"] = it.id
216 blueprintMessageProducerService.sendMessageNB(
217 message = it.asJsonString(false),
222 MessagePrioritizationSample
223 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
226 val headers: MutableMap<String, String> = hashMapOf()
227 headers["id"] = it.id
228 blueprintMessageProducerService.sendMessageNB(
229 message = it.asJsonString(false),
234 MessagePrioritizationSample
235 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
238 val headers: MutableMap<String, String> = hashMapOf()
239 headers["id"] = it.id
240 blueprintMessageProducerService.sendMessageNB(
241 message = it.asJsonString(false),
247 messagePrioritizationConsumer.shutDown()