2 * Copyright © 2018-2019 AT&T Intellectual Property.
3 * Modifications Copyright © 2021 Bell Canada.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
20 import io.micrometer.core.instrument.MeterRegistry
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.runBlocking
24 import org.junit.Before
25 import org.junit.runner.RunWith
26 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
37 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
38 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
39 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
40 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
41 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
42 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
43 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
44 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
45 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
46 import org.onap.ccsdk.cds.controllerblueprints.core.logger
47 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
48 import org.springframework.beans.factory.annotation.Autowired
49 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
50 import org.springframework.boot.test.mock.mockito.MockBean
51 import org.springframework.context.ApplicationContext
52 import org.springframework.test.annotation.DirtiesContext
53 import org.springframework.test.context.ContextConfiguration
54 import org.springframework.test.context.TestPropertySource
55 import org.springframework.test.context.junit4.SpringRunner
56 import kotlin.test.Test
57 import kotlin.test.assertNotNull
59 @RunWith(SpringRunner::class)
62 @ContextConfiguration(
64 BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
65 BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
66 MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class
72 "spring.jpa.show-sql=false",
73 "spring.jpa.properties.hibernate.show_sql=false",
74 "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
76 "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
77 "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
78 "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
79 "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
80 "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
81 "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
82 "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
83 "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
84 "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
85 "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
87 // To send initial test message
88 "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
89 "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
90 "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
91 "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
92 "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
93 "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
94 "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
95 "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
96 "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
98 "blueprintsprocessor.nats.cds-controller.type=token-auth",
99 "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
100 "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
103 open class MessagePrioritizationConsumerTest {
105 private val log = logger(MessagePrioritizationConsumerTest::class)
108 lateinit var applicationContext: ApplicationContext
111 lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
114 lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
117 lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
120 lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
123 lateinit var meterRegistry: MeterRegistry
127 BluePrintDependencyService.inject(applicationContext)
131 fun testBluePrintKafkaJDBCKeyStore() {
133 assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
135 val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
136 .instance(MessagePrioritizationStateService::class)
137 assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
139 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
140 val message = messagePrioritizationService.saveMessage(it)
141 val repoResult = messagePrioritizationService.getMessage(message.id)
142 assertNotNull(repoResult, "failed to get inserted message.")
148 fun testMessagePrioritizationService() {
150 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
151 val messagePrioritizationService =
152 SampleMessagePrioritizationService(messagePrioritizationStateService)
153 messagePrioritizationService.setConfiguration(configuration)
155 log.info("**************** without Correlation **************")
156 /** Checking without correlation */
157 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
158 messagePrioritizationService.prioritize(it)
160 log.info("**************** Same Group , with Correlation **************")
161 /** checking same group with correlation */
162 MessagePrioritizationSample
163 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
166 messagePrioritizationService.prioritize(it)
168 log.info("**************** Different Type , with Correlation **************")
169 /** checking different type, with correlation */
170 MessagePrioritizationSample
171 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
174 messagePrioritizationService.prioritize(it)
180 fun testSchedulerService() {
182 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
183 val messagePrioritizationService =
184 SampleMessagePrioritizationService(messagePrioritizationStateService)
185 messagePrioritizationService.setConfiguration(configuration)
187 val messagePrioritizationSchedulerService =
188 MessagePrioritizationSchedulerService(messagePrioritizationService)
190 messagePrioritizationSchedulerService.startScheduling()
193 /** To debug increase the delay time */
195 messagePrioritizationSchedulerService.shutdownScheduling()
200 /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
202 fun testKafkaMessagePrioritizationConsumer() {
205 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
206 val kafkaMessagePrioritizationService =
207 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
208 kafkaMessagePrioritizationService.setConfiguration(configuration)
210 val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
211 messagePrioritizationStateService,
212 kafkaMessagePrioritizationService
215 // Register the processor
216 BluePrintDependencyService.registerSingleton(
217 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
218 defaultMessagePrioritizeProcessor
221 val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
222 bluePrintMessageLibPropertyService,
223 kafkaMessagePrioritizationService
225 messagePrioritizationConsumer.startConsuming(configuration)
227 /** Send sample message with every 1 sec */
228 val blueprintMessageProducerService = bluePrintMessageLibPropertyService
229 .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
231 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
233 val headers: MutableMap<String, String> = hashMapOf()
234 headers["id"] = it.id
235 blueprintMessageProducerService.sendMessageNB(
237 message = it.asJsonString(false),
242 MessagePrioritizationSample
243 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
246 val headers: MutableMap<String, String> = hashMapOf()
247 headers["id"] = it.id
248 blueprintMessageProducerService.sendMessageNB(
250 message = it.asJsonString(false),
255 MessagePrioritizationSample
256 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
259 val headers: MutableMap<String, String> = hashMapOf()
260 headers["id"] = it.id
261 blueprintMessageProducerService.sendMessageNB(
263 message = it.asJsonString(false),
269 messagePrioritizationConsumer.shutDown()
273 /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
275 * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
278 fun testNatsMessagePrioritizationConsumer() {
280 val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
281 assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
284 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
286 val natsMessagePrioritizationService =
287 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
288 natsMessagePrioritizationService.setConfiguration(configuration)
290 val messagePrioritizationConsumer =
291 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
292 messagePrioritizationConsumer.startConsuming()
294 /** Send sample message with every 1 sec */
295 val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
298 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
300 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
303 MessagePrioritizationSample
304 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
307 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
310 MessagePrioritizationSample
311 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
314 bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
318 messagePrioritizationConsumer.shutDown()