2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
19 import io.mockk.coEvery
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.junit.Before
27 import org.junit.runner.RunWith
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
39 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
40 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
42 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
43 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
44 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
45 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
46 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
47 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
50 import org.springframework.beans.factory.annotation.Autowired
51 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
52 import org.springframework.context.ApplicationContext
53 import org.springframework.test.annotation.DirtiesContext
54 import org.springframework.test.context.ContextConfiguration
55 import org.springframework.test.context.TestPropertySource
56 import org.springframework.test.context.junit4.SpringRunner
57 import kotlin.test.Test
58 import kotlin.test.assertNotNull
60 @RunWith(SpringRunner::class)
63 @ContextConfiguration(
64 classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
65 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
66 MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
71 "spring.jpa.show-sql=false",
72 "spring.jpa.properties.hibernate.show_sql=false",
73 "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
75 "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
76 "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
77 "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
78 "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
79 "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
80 "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
81 "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
82 "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
83 "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
84 "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
86 // To send initial test message
87 "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
88 "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
89 "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
90 "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
91 "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
92 "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
93 "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
94 "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
95 "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
97 "blueprintsprocessor.nats.cds-controller.type=token-auth",
98 "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
99 "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
102 open class MessagePrioritizationConsumerTest {
104 private val log = logger(MessagePrioritizationConsumerTest::class)
107 lateinit var applicationContext: ApplicationContext
110 lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
113 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
116 lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
119 lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
123 BluePrintDependencyService.inject(applicationContext)
127 fun testBluePrintKafkaJDBCKeyStore() {
129 assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
131 val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
132 .instance(MessagePrioritizationStateService::class)
133 assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
135 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
136 val message = messagePrioritizationService.saveMessage(it)
137 val repoResult = messagePrioritizationService.getMessage(message.id)
138 assertNotNull(repoResult, "failed to get inserted message.")
144 fun testMessagePrioritizationService() {
146 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
147 val messagePrioritizationService =
148 SampleMessagePrioritizationService(messagePrioritizationStateService)
149 messagePrioritizationService.setConfiguration(configuration)
151 log.info("**************** without Correlation **************")
152 /** Checking without correlation */
153 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
154 messagePrioritizationService.prioritize(it)
156 log.info("**************** Same Group , with Correlation **************")
157 /** checking same group with correlation */
158 MessagePrioritizationSample
159 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
162 messagePrioritizationService.prioritize(it)
164 log.info("**************** Different Type , with Correlation **************")
165 /** checking different type, with correlation */
166 MessagePrioritizationSample
167 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
170 messagePrioritizationService.prioritize(it)
176 fun testStartConsuming() {
178 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
180 val streamingConsumerService = bluePrintMessageLibPropertyService
181 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
182 assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
184 val spyStreamingConsumerService = spyk(streamingConsumerService)
185 coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
186 coEvery { spyStreamingConsumerService.shutDown() } returns Unit
187 val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
188 bluePrintMessageLibPropertyService, mockk()
190 val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
193 val kafkaStreamConsumerFunction =
194 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
195 val messageConsumerProperties = bluePrintMessageLibPropertyService
196 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
197 val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
198 assertNotNull(topology, "failed to get create topology")
200 every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
201 spyMessagePrioritizationConsumer.startConsuming(configuration)
202 spyMessagePrioritizationConsumer.shutDown()
207 fun testSchedulerService() {
209 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
210 val messagePrioritizationService =
211 SampleMessagePrioritizationService(messagePrioritizationStateService)
212 messagePrioritizationService.setConfiguration(configuration)
214 val messagePrioritizationSchedulerService =
215 MessagePrioritizationSchedulerService(messagePrioritizationService)
217 messagePrioritizationSchedulerService.startScheduling()
220 /** To debug increase the delay time */
222 messagePrioritizationSchedulerService.shutdownScheduling()
227 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
229 fun testKafkaMessagePrioritizationConsumer() {
232 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
233 val kafkaMessagePrioritizationService =
234 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
235 kafkaMessagePrioritizationService.setConfiguration(configuration)
237 val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
238 messagePrioritizationStateService,
239 kafkaMessagePrioritizationService
242 // Register the processor
243 BluePrintDependencyService.registerSingleton(
244 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
245 defaultMessagePrioritizeProcessor
248 val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
249 bluePrintMessageLibPropertyService,
250 kafkaMessagePrioritizationService
252 messagePrioritizationConsumer.startConsuming(configuration)
254 /** Send sample message with every 1 sec */
255 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
256 .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
258 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
260 val headers: MutableMap<String, String> = hashMapOf()
261 headers["id"] = it.id
262 blueprintMessageProducerService.sendMessageNB(
264 message = it.asJsonString(false),
269 MessagePrioritizationSample
270 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
273 val headers: MutableMap<String, String> = hashMapOf()
274 headers["id"] = it.id
275 blueprintMessageProducerService.sendMessageNB(
277 message = it.asJsonString(false),
282 MessagePrioritizationSample
283 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
286 val headers: MutableMap<String, String> = hashMapOf()
287 headers["id"] = it.id
288 blueprintMessageProducerService.sendMessageNB(
290 message = it.asJsonString(false),
296 messagePrioritizationConsumer.shutDown()
300 /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
302 * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
305 fun testNatsMessagePrioritizationConsumer() {
307 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
308 assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
311 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
313 val natsMessagePrioritizationService =
314 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
315 natsMessagePrioritizationService.setConfiguration(configuration)
317 val messagePrioritizationConsumer =
318 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
319 messagePrioritizationConsumer.startConsuming()
321 /** Send sample message with every 1 sec */
322 val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
325 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
327 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
330 MessagePrioritizationSample
331 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
334 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
337 MessagePrioritizationSample
338 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
341 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
345 messagePrioritizationConsumer.shutDown()