2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
19 import io.mockk.coEvery
22 import kotlinx.coroutines.delay
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.junit.Before
26 import org.junit.runner.RunWith
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
34 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
35 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
36 import org.onap.ccsdk.cds.controllerblueprints.core.logger
37 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
38 import org.springframework.beans.factory.annotation.Autowired
39 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
40 import org.springframework.context.ApplicationContext
41 import org.springframework.test.annotation.DirtiesContext
42 import org.springframework.test.context.ContextConfiguration
43 import org.springframework.test.context.TestPropertySource
44 import org.springframework.test.context.junit4.SpringRunner
45 import kotlin.test.Test
46 import kotlin.test.assertNotNull
47 import kotlin.test.assertTrue
49 @RunWith(SpringRunner::class)
52 @ContextConfiguration(
53 classes = [BluePrintMessageLibConfiguration::class,
54 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
55 MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
60 "spring.jpa.show-sql=true",
61 "spring.jpa.properties.hibernate.show_sql=true",
62 "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
64 "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
65 "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
66 "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
67 "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
69 // To send initial test message
70 "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
71 "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
72 "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
75 open class MessagePrioritizationConsumerTest {
77 private val log = logger(MessagePrioritizationConsumerTest::class)
80 lateinit var applicationContext: ApplicationContext
83 lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
86 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
89 lateinit var messagePrioritizationService: MessagePrioritizationService
92 lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
96 BluePrintDependencyService.inject(applicationContext)
100 fun testBluePrintKafkaJDBCKeyStore() {
102 assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
104 val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
105 .instance(MessagePrioritizationStateService::class)
106 assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
108 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
109 val message = messagePrioritizationService.saveMessage(it)
110 val repoResult = messagePrioritizationService.getMessage(message.id)
111 assertNotNull(repoResult, "failed to get inserted message.")
117 fun testMessagePrioritizationService() {
120 ::messagePrioritizationService.isInitialized,
121 "failed to initialize messagePrioritizationService"
124 log.info("**************** without Correlation **************")
125 /** Checking without correlation */
126 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
127 messagePrioritizationService.prioritize(it)
129 log.info("**************** Same Group , with Correlation **************")
130 /** checking same group with correlation */
131 MessagePrioritizationSample
132 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
135 messagePrioritizationService.prioritize(it)
137 log.info("**************** Different Type , with Correlation **************")
138 /** checking different type, with correlation */
139 MessagePrioritizationSample
140 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
143 messagePrioritizationService.prioritize(it)
149 fun testStartConsuming() {
151 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
153 val streamingConsumerService = bluePrintMessageLibPropertyService
154 .blueprintMessageConsumerService(configuration.inputTopicSelector)
155 assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
157 val spyStreamingConsumerService = spyk(streamingConsumerService)
158 coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
159 coEvery { spyStreamingConsumerService.shutDown() } returns Unit
160 val messagePrioritizationConsumer = MessagePrioritizationConsumer(
161 bluePrintMessageLibPropertyService
163 val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
166 val kafkaStreamConsumerFunction =
167 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
168 val messageConsumerProperties = bluePrintMessageLibPropertyService
169 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
170 val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
171 assertNotNull(topology, "failed to get create topology")
173 every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
174 spyMessagePrioritizationConsumer.startConsuming(configuration)
175 spyMessagePrioritizationConsumer.shutDown()
179 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
181 fun testMessagePrioritizationConsumer() {
183 messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
185 /** Send sample message with every 1 sec */
186 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
187 .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
189 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
191 val headers: MutableMap<String, String> = hashMapOf()
192 headers["id"] = it.id
193 blueprintMessageProducerService.sendMessageNB(
194 message = it.asJsonString(false),
199 MessagePrioritizationSample
200 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
203 val headers: MutableMap<String, String> = hashMapOf()
204 headers["id"] = it.id
205 blueprintMessageProducerService.sendMessageNB(
206 message = it.asJsonString(false),
211 MessagePrioritizationSample
212 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
215 val headers: MutableMap<String, String> = hashMapOf()
216 headers["id"] = it.id
217 blueprintMessageProducerService.sendMessageNB(
218 message = it.asJsonString(false),
224 messagePrioritizationConsumer.shutDown()