2 * Copyright © 2018-2019 AT&T Intellectual Property.
3 * Modifications Copyright © 2021 Bell Canada.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
20 import io.micrometer.core.instrument.MeterRegistry
21 import io.mockk.coEvery
25 import kotlinx.coroutines.delay
26 import kotlinx.coroutines.launch
27 import kotlinx.coroutines.runBlocking
28 import org.junit.Before
29 import org.junit.runner.RunWith
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
39 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
40 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfiguration
42 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService
43 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
44 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BlueprintNatsLibConfiguration
45 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BlueprintNatsLibPropertyService
46 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
47 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
48 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
49 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
50 import org.onap.ccsdk.cds.controllerblueprints.core.logger
51 import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintDependencyService
52 import org.springframework.beans.factory.annotation.Autowired
53 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
54 import org.springframework.boot.test.mock.mockito.MockBean
55 import org.springframework.context.ApplicationContext
56 import org.springframework.test.annotation.DirtiesContext
57 import org.springframework.test.context.ContextConfiguration
58 import org.springframework.test.context.TestPropertySource
59 import org.springframework.test.context.junit4.SpringRunner
60 import kotlin.test.Test
61 import kotlin.test.assertNotNull
63 @RunWith(SpringRunner::class)
66 @ContextConfiguration(
68 BlueprintMessageLibConfiguration::class, BlueprintNatsLibConfiguration::class,
69 BlueprintPropertyConfiguration::class, BlueprintPropertiesService::class,
70 MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class
76 "spring.jpa.show-sql=false",
77 "spring.jpa.properties.hibernate.show_sql=false",
78 "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
80 "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
81 "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
82 "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
83 "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
84 "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
85 "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
86 "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
87 "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
88 "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
89 "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
91 // To send initial test message
92 "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
93 "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
94 "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
95 "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
96 "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
97 "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
98 "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
99 "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
100 "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
102 "blueprintsprocessor.nats.cds-controller.type=token-auth",
103 "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
104 "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
107 open class MessagePrioritizationConsumerTest {
109 private val log = logger(MessagePrioritizationConsumerTest::class)
112 lateinit var applicationContext: ApplicationContext
115 lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
118 lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
121 lateinit var bluePrintNatsLibPropertyService: BlueprintNatsLibPropertyService
124 lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
127 lateinit var meterRegistry: MeterRegistry
131 BlueprintDependencyService.inject(applicationContext)
135 fun testBlueprintKafkaJDBCKeyStore() {
137 assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
139 val messagePrioritizationService: MessagePrioritizationStateService = BlueprintDependencyService
140 .instance(MessagePrioritizationStateService::class)
141 assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
143 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
144 val message = messagePrioritizationService.saveMessage(it)
145 val repoResult = messagePrioritizationService.getMessage(message.id)
146 assertNotNull(repoResult, "failed to get inserted message.")
152 fun testMessagePrioritizationService() {
154 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
155 val messagePrioritizationService =
156 SampleMessagePrioritizationService(messagePrioritizationStateService)
157 messagePrioritizationService.setConfiguration(configuration)
159 log.info("**************** without Correlation **************")
160 /** Checking without correlation */
161 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
162 messagePrioritizationService.prioritize(it)
164 log.info("**************** Same Group , with Correlation **************")
165 /** checking same group with correlation */
166 MessagePrioritizationSample
167 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
170 messagePrioritizationService.prioritize(it)
172 log.info("**************** Different Type , with Correlation **************")
173 /** checking different type, with correlation */
174 MessagePrioritizationSample
175 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
178 messagePrioritizationService.prioritize(it)
184 fun testStartConsuming() {
186 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
188 val streamingConsumerService = bluePrintMessageLibPropertyService
189 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
190 assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
192 val spyStreamingConsumerService = spyk(streamingConsumerService)
193 coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
194 coEvery { spyStreamingConsumerService.shutDown() } returns Unit
195 val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
196 bluePrintMessageLibPropertyService, mockk()
198 val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
201 val kafkaStreamConsumerFunction =
202 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
203 val messageConsumerProperties = bluePrintMessageLibPropertyService
204 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
205 val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
206 assertNotNull(topology, "failed to get create topology")
208 every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
209 spyMessagePrioritizationConsumer.startConsuming(configuration)
210 spyMessagePrioritizationConsumer.shutDown()
215 fun testSchedulerService() {
217 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
218 val messagePrioritizationService =
219 SampleMessagePrioritizationService(messagePrioritizationStateService)
220 messagePrioritizationService.setConfiguration(configuration)
222 val messagePrioritizationSchedulerService =
223 MessagePrioritizationSchedulerService(messagePrioritizationService)
225 messagePrioritizationSchedulerService.startScheduling()
228 /** To debug increase the delay time */
230 messagePrioritizationSchedulerService.shutdownScheduling()
235 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
237 fun testKafkaMessagePrioritizationConsumer() {
240 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
241 val kafkaMessagePrioritizationService =
242 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
243 kafkaMessagePrioritizationService.setConfiguration(configuration)
245 val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
246 messagePrioritizationStateService,
247 kafkaMessagePrioritizationService
250 // Register the processor
251 BlueprintDependencyService.registerSingleton(
252 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
253 defaultMessagePrioritizeProcessor
256 val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
257 bluePrintMessageLibPropertyService,
258 kafkaMessagePrioritizationService
260 messagePrioritizationConsumer.startConsuming(configuration)
262 /** Send sample message with every 1 sec */
263 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
264 .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
266 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
268 val headers: MutableMap<String, String> = hashMapOf()
269 headers["id"] = it.id
270 blueprintMessageProducerService.sendMessageNB(
272 message = it.asJsonString(false),
277 MessagePrioritizationSample
278 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
281 val headers: MutableMap<String, String> = hashMapOf()
282 headers["id"] = it.id
283 blueprintMessageProducerService.sendMessageNB(
285 message = it.asJsonString(false),
290 MessagePrioritizationSample
291 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
294 val headers: MutableMap<String, String> = hashMapOf()
295 headers["id"] = it.id
296 blueprintMessageProducerService.sendMessageNB(
298 message = it.asJsonString(false),
304 messagePrioritizationConsumer.shutDown()
308 /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
310 * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
313 fun testNatsMessagePrioritizationConsumer() {
315 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
316 assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
319 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
321 val natsMessagePrioritizationService =
322 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
323 natsMessagePrioritizationService.setConfiguration(configuration)
325 val messagePrioritizationConsumer =
326 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
327 messagePrioritizationConsumer.startConsuming()
329 /** Send sample message with every 1 sec */
330 val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
333 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
335 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
338 MessagePrioritizationSample
339 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
342 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
345 MessagePrioritizationSample
346 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
349 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
353 messagePrioritizationConsumer.shutDown()