2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
19 import io.mockk.coEvery
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.junit.Before
27 import org.junit.runner.RunWith
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
39 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
40 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
42 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
43 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
44 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
45 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
46 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
47 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
50 import org.springframework.beans.factory.annotation.Autowired
51 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
52 import org.springframework.context.ApplicationContext
53 import org.springframework.test.annotation.DirtiesContext
54 import org.springframework.test.context.ContextConfiguration
55 import org.springframework.test.context.TestPropertySource
56 import org.springframework.test.context.junit4.SpringRunner
57 import kotlin.test.Test
58 import kotlin.test.assertNotNull
60 @RunWith(SpringRunner::class)
63 @ContextConfiguration(
64 classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
65 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
66 MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
71 "spring.jpa.show-sql=false",
72 "spring.jpa.properties.hibernate.show_sql=false",
73 "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
75 "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
76 "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
77 "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
78 "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
80 // To send initial test message
81 "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
82 "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
83 "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
85 "blueprintsprocessor.nats.cds-controller.type=token-auth",
86 "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
87 "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
90 open class MessagePrioritizationConsumerTest {
92 private val log = logger(MessagePrioritizationConsumerTest::class)
95 lateinit var applicationContext: ApplicationContext
98 lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
101 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
104 lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
107 lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
111 BluePrintDependencyService.inject(applicationContext)
115 fun testBluePrintKafkaJDBCKeyStore() {
117 assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
119 val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
120 .instance(MessagePrioritizationStateService::class)
121 assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
123 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
124 val message = messagePrioritizationService.saveMessage(it)
125 val repoResult = messagePrioritizationService.getMessage(message.id)
126 assertNotNull(repoResult, "failed to get inserted message.")
132 fun testMessagePrioritizationService() {
134 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
135 val messagePrioritizationService =
136 SampleMessagePrioritizationService(messagePrioritizationStateService)
137 messagePrioritizationService.setConfiguration(configuration)
139 log.info("**************** without Correlation **************")
140 /** Checking without correlation */
141 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
142 messagePrioritizationService.prioritize(it)
144 log.info("**************** Same Group , with Correlation **************")
145 /** checking same group with correlation */
146 MessagePrioritizationSample
147 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
150 messagePrioritizationService.prioritize(it)
152 log.info("**************** Different Type , with Correlation **************")
153 /** checking different type, with correlation */
154 MessagePrioritizationSample
155 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
158 messagePrioritizationService.prioritize(it)
164 fun testStartConsuming() {
166 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
168 val streamingConsumerService = bluePrintMessageLibPropertyService
169 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
170 assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
172 val spyStreamingConsumerService = spyk(streamingConsumerService)
173 coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
174 coEvery { spyStreamingConsumerService.shutDown() } returns Unit
175 val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
176 bluePrintMessageLibPropertyService, mockk()
178 val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
181 val kafkaStreamConsumerFunction =
182 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
183 val messageConsumerProperties = bluePrintMessageLibPropertyService
184 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
185 val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
186 assertNotNull(topology, "failed to get create topology")
188 every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
189 spyMessagePrioritizationConsumer.startConsuming(configuration)
190 spyMessagePrioritizationConsumer.shutDown()
195 fun testSchedulerService() {
197 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
198 val messagePrioritizationService =
199 SampleMessagePrioritizationService(messagePrioritizationStateService)
200 messagePrioritizationService.setConfiguration(configuration)
202 val messagePrioritizationSchedulerService =
203 MessagePrioritizationSchedulerService(messagePrioritizationService)
205 messagePrioritizationSchedulerService.startScheduling()
208 /** To debug increase the delay time */
210 messagePrioritizationSchedulerService.shutdownScheduling()
215 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
217 fun testKafkaMessagePrioritizationConsumer() {
220 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
221 val kafkaMessagePrioritizationService =
222 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
223 kafkaMessagePrioritizationService.setConfiguration(configuration)
225 val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
226 messagePrioritizationStateService,
227 kafkaMessagePrioritizationService
230 // Register the processor
231 BluePrintDependencyService.registerSingleton(
232 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
233 defaultMessagePrioritizeProcessor
236 val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
237 bluePrintMessageLibPropertyService,
238 kafkaMessagePrioritizationService
240 messagePrioritizationConsumer.startConsuming(configuration)
242 /** Send sample message with every 1 sec */
243 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
244 .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
246 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
248 val headers: MutableMap<String, String> = hashMapOf()
249 headers["id"] = it.id
250 blueprintMessageProducerService.sendMessageNB(
251 message = it.asJsonString(false),
256 MessagePrioritizationSample
257 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
260 val headers: MutableMap<String, String> = hashMapOf()
261 headers["id"] = it.id
262 blueprintMessageProducerService.sendMessageNB(
263 message = it.asJsonString(false),
268 MessagePrioritizationSample
269 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
272 val headers: MutableMap<String, String> = hashMapOf()
273 headers["id"] = it.id
274 blueprintMessageProducerService.sendMessageNB(
275 message = it.asJsonString(false),
281 messagePrioritizationConsumer.shutDown()
285 /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
287 * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
290 fun testNatsMessagePrioritizationConsumer() {
292 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
293 assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
296 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
298 val natsMessagePrioritizationService =
299 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
300 natsMessagePrioritizationService.setConfiguration(configuration)
302 val messagePrioritizationConsumer =
303 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
304 messagePrioritizationConsumer.startConsuming()
306 /** Send sample message with every 1 sec */
307 val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
310 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
312 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
315 MessagePrioritizationSample
316 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
319 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
322 MessagePrioritizationSample
323 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
326 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
330 messagePrioritizationConsumer.shutDown()