2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
19 import io.mockk.coEvery
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.junit.Before
27 import org.junit.runner.RunWith
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
39 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
40 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
42 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
43 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
44 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
45 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
46 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
47 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
50 import org.springframework.beans.factory.annotation.Autowired
51 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
52 import org.springframework.context.ApplicationContext
53 import org.springframework.test.annotation.DirtiesContext
54 import org.springframework.test.context.ContextConfiguration
55 import org.springframework.test.context.TestPropertySource
56 import org.springframework.test.context.junit4.SpringRunner
57 import kotlin.test.Test
58 import kotlin.test.assertNotNull
60 @RunWith(SpringRunner::class)
63 @ContextConfiguration(
65 BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
66 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
67 MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class
73 "spring.jpa.show-sql=false",
74 "spring.jpa.properties.hibernate.show_sql=false",
75 "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
77 "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
78 "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
79 "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
80 "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
81 "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
82 "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
83 "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
84 "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
85 "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
86 "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
88 // To send initial test message
89 "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
90 "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
91 "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
92 "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
93 "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
94 "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
95 "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
96 "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
97 "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
99 "blueprintsprocessor.nats.cds-controller.type=token-auth",
100 "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
101 "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
104 open class MessagePrioritizationConsumerTest {
106 private val log = logger(MessagePrioritizationConsumerTest::class)
109 lateinit var applicationContext: ApplicationContext
112 lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
115 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
118 lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
121 lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
125 BluePrintDependencyService.inject(applicationContext)
129 fun testBluePrintKafkaJDBCKeyStore() {
131 assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
133 val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
134 .instance(MessagePrioritizationStateService::class)
135 assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
137 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
138 val message = messagePrioritizationService.saveMessage(it)
139 val repoResult = messagePrioritizationService.getMessage(message.id)
140 assertNotNull(repoResult, "failed to get inserted message.")
146 fun testMessagePrioritizationService() {
148 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
149 val messagePrioritizationService =
150 SampleMessagePrioritizationService(messagePrioritizationStateService)
151 messagePrioritizationService.setConfiguration(configuration)
153 log.info("**************** without Correlation **************")
154 /** Checking without correlation */
155 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
156 messagePrioritizationService.prioritize(it)
158 log.info("**************** Same Group , with Correlation **************")
159 /** checking same group with correlation */
160 MessagePrioritizationSample
161 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
164 messagePrioritizationService.prioritize(it)
166 log.info("**************** Different Type , with Correlation **************")
167 /** checking different type, with correlation */
168 MessagePrioritizationSample
169 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
172 messagePrioritizationService.prioritize(it)
178 fun testStartConsuming() {
180 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
182 val streamingConsumerService = bluePrintMessageLibPropertyService
183 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
184 assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
186 val spyStreamingConsumerService = spyk(streamingConsumerService)
187 coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
188 coEvery { spyStreamingConsumerService.shutDown() } returns Unit
189 val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
190 bluePrintMessageLibPropertyService, mockk()
192 val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
195 val kafkaStreamConsumerFunction =
196 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
197 val messageConsumerProperties = bluePrintMessageLibPropertyService
198 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
199 val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
200 assertNotNull(topology, "failed to get create topology")
202 every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
203 spyMessagePrioritizationConsumer.startConsuming(configuration)
204 spyMessagePrioritizationConsumer.shutDown()
209 fun testSchedulerService() {
211 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
212 val messagePrioritizationService =
213 SampleMessagePrioritizationService(messagePrioritizationStateService)
214 messagePrioritizationService.setConfiguration(configuration)
216 val messagePrioritizationSchedulerService =
217 MessagePrioritizationSchedulerService(messagePrioritizationService)
219 messagePrioritizationSchedulerService.startScheduling()
222 /** To debug increase the delay time */
224 messagePrioritizationSchedulerService.shutdownScheduling()
229 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
231 fun testKafkaMessagePrioritizationConsumer() {
234 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
235 val kafkaMessagePrioritizationService =
236 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
237 kafkaMessagePrioritizationService.setConfiguration(configuration)
239 val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
240 messagePrioritizationStateService,
241 kafkaMessagePrioritizationService
244 // Register the processor
245 BluePrintDependencyService.registerSingleton(
246 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
247 defaultMessagePrioritizeProcessor
250 val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
251 bluePrintMessageLibPropertyService,
252 kafkaMessagePrioritizationService
254 messagePrioritizationConsumer.startConsuming(configuration)
256 /** Send sample message with every 1 sec */
257 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
258 .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
260 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
262 val headers: MutableMap<String, String> = hashMapOf()
263 headers["id"] = it.id
264 blueprintMessageProducerService.sendMessageNB(
266 message = it.asJsonString(false),
271 MessagePrioritizationSample
272 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
275 val headers: MutableMap<String, String> = hashMapOf()
276 headers["id"] = it.id
277 blueprintMessageProducerService.sendMessageNB(
279 message = it.asJsonString(false),
284 MessagePrioritizationSample
285 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
288 val headers: MutableMap<String, String> = hashMapOf()
289 headers["id"] = it.id
290 blueprintMessageProducerService.sendMessageNB(
292 message = it.asJsonString(false),
298 messagePrioritizationConsumer.shutDown()
302 /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
304 * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
307 fun testNatsMessagePrioritizationConsumer() {
309 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
310 assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
313 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
315 val natsMessagePrioritizationService =
316 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
317 natsMessagePrioritizationService.setConfiguration(configuration)
319 val messagePrioritizationConsumer =
320 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
321 messagePrioritizationConsumer.startConsuming()
323 /** Send sample message with every 1 sec */
324 val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
327 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
329 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
332 MessagePrioritizationSample
333 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
336 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
339 MessagePrioritizationSample
340 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
343 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
347 messagePrioritizationConsumer.shutDown()