Revert "Renaming Files having BluePrint to have Blueprint"
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritization / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumerTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  * Modifications Copyright © 2021 Bell Canada.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
19
20 import io.micrometer.core.instrument.MeterRegistry
21 import io.mockk.coEvery
22 import io.mockk.every
23 import io.mockk.mockk
24 import io.mockk.spyk
25 import kotlinx.coroutines.delay
26 import kotlinx.coroutines.launch
27 import kotlinx.coroutines.runBlocking
28 import org.junit.Before
29 import org.junit.runner.RunWith
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
39 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
40 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
42 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
43 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
44 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
45 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
46 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
47 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
48 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
49 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
50 import org.onap.ccsdk.cds.controllerblueprints.core.logger
51 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
52 import org.springframework.beans.factory.annotation.Autowired
53 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
54 import org.springframework.boot.test.mock.mockito.MockBean
55 import org.springframework.context.ApplicationContext
56 import org.springframework.test.annotation.DirtiesContext
57 import org.springframework.test.context.ContextConfiguration
58 import org.springframework.test.context.TestPropertySource
59 import org.springframework.test.context.junit4.SpringRunner
60 import kotlin.test.Test
61 import kotlin.test.assertNotNull
62
63 @RunWith(SpringRunner::class)
64 @DataJpaTest
65 @DirtiesContext
66 @ContextConfiguration(
67     classes = [
68         BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
69         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
70         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class
71     ]
72 )
73 @TestPropertySource(
74     properties =
75         [
76             "spring.jpa.show-sql=false",
77             "spring.jpa.properties.hibernate.show_sql=false",
78             "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
79
80             "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
81             "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
82             "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
83             "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
84             "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
85             "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
86             "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
87             "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
88             "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
89             "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
90
91             // To send initial test message
92             "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
93             "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
94             "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
95             "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
96             "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
97             "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
98             "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
99             "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
100             "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
101
102             "blueprintsprocessor.nats.cds-controller.type=token-auth",
103             "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
104             "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
105         ]
106 )
107 open class MessagePrioritizationConsumerTest {
108
109     private val log = logger(MessagePrioritizationConsumerTest::class)
110
111     @Autowired
112     lateinit var applicationContext: ApplicationContext
113
114     @Autowired
115     lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
116
117     @Autowired
118     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
119
120     @Autowired
121     lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
122
123     @Autowired
124     lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
125
126     @MockBean
127     lateinit var meterRegistry: MeterRegistry
128
129     @Before
130     fun setup() {
131         BluePrintDependencyService.inject(applicationContext)
132     }
133
134     @Test
135     fun testBluePrintKafkaJDBCKeyStore() {
136         runBlocking {
137             assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
138
139             val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
140                 .instance(MessagePrioritizationStateService::class)
141             assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
142
143             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
144                 val message = messagePrioritizationService.saveMessage(it)
145                 val repoResult = messagePrioritizationService.getMessage(message.id)
146                 assertNotNull(repoResult, "failed to get inserted message.")
147             }
148         }
149     }
150
151     @Test
152     fun testMessagePrioritizationService() {
153         runBlocking {
154             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
155             val messagePrioritizationService =
156                 SampleMessagePrioritizationService(messagePrioritizationStateService)
157             messagePrioritizationService.setConfiguration(configuration)
158
159             log.info("****************  without Correlation **************")
160             /** Checking without correlation */
161             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
162                 messagePrioritizationService.prioritize(it)
163             }
164             log.info("****************  Same Group , with Correlation **************")
165             /** checking same group with correlation */
166             MessagePrioritizationSample
167                 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
168                 .forEach {
169                     delay(10)
170                     messagePrioritizationService.prioritize(it)
171                 }
172             log.info("****************  Different Type , with Correlation **************")
173             /** checking different type, with correlation */
174             MessagePrioritizationSample
175                 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
176                 .forEach {
177                     delay(10)
178                     messagePrioritizationService.prioritize(it)
179                 }
180         }
181     }
182
183     @Test
184     fun testStartConsuming() {
185         runBlocking {
186             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
187
188             val streamingConsumerService = bluePrintMessageLibPropertyService
189                 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
190             assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
191
192             val spyStreamingConsumerService = spyk(streamingConsumerService)
193             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
194             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
195             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
196                 bluePrintMessageLibPropertyService, mockk()
197             )
198             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
199
200             // Test Topology
201             val kafkaStreamConsumerFunction =
202                 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
203             val messageConsumerProperties = bluePrintMessageLibPropertyService
204                 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
205             val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
206             assertNotNull(topology, "failed to get create topology")
207
208             every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
209             spyMessagePrioritizationConsumer.startConsuming(configuration)
210             spyMessagePrioritizationConsumer.shutDown()
211         }
212     }
213
214     @Test
215     fun testSchedulerService() {
216         runBlocking {
217             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
218             val messagePrioritizationService =
219                 SampleMessagePrioritizationService(messagePrioritizationStateService)
220             messagePrioritizationService.setConfiguration(configuration)
221
222             val messagePrioritizationSchedulerService =
223                 MessagePrioritizationSchedulerService(messagePrioritizationService)
224             launch {
225                 messagePrioritizationSchedulerService.startScheduling()
226             }
227             launch {
228                 /** To debug increase the delay time */
229                 delay(20)
230                 messagePrioritizationSchedulerService.shutdownScheduling()
231             }
232         }
233     }
234
235     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
236     // @Test
237     fun testKafkaMessagePrioritizationConsumer() {
238         runBlocking {
239
240             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
241             val kafkaMessagePrioritizationService =
242                 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
243             kafkaMessagePrioritizationService.setConfiguration(configuration)
244
245             val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
246                 messagePrioritizationStateService,
247                 kafkaMessagePrioritizationService
248             )
249
250             // Register the processor
251             BluePrintDependencyService.registerSingleton(
252                 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
253                 defaultMessagePrioritizeProcessor
254             )
255
256             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
257                 bluePrintMessageLibPropertyService,
258                 kafkaMessagePrioritizationService
259             )
260             messagePrioritizationConsumer.startConsuming(configuration)
261
262             /** Send sample message with every 1 sec */
263             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
264                 .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
265             launch {
266                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
267                     delay(100)
268                     val headers: MutableMap<String, String> = hashMapOf()
269                     headers["id"] = it.id
270                     blueprintMessageProducerService.sendMessageNB(
271                         key = "mykey",
272                         message = it.asJsonString(false),
273                         headers = headers
274                     )
275                 }
276
277                 MessagePrioritizationSample
278                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
279                     .forEach {
280                         delay(100)
281                         val headers: MutableMap<String, String> = hashMapOf()
282                         headers["id"] = it.id
283                         blueprintMessageProducerService.sendMessageNB(
284                             key = "mykey",
285                             message = it.asJsonString(false),
286                             headers = headers
287                         )
288                     }
289
290                 MessagePrioritizationSample
291                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
292                     .forEach {
293                         delay(2000)
294                         val headers: MutableMap<String, String> = hashMapOf()
295                         headers["id"] = it.id
296                         blueprintMessageProducerService.sendMessageNB(
297                             key = "mykey",
298                             message = it.asJsonString(false),
299                             headers = headers
300                         )
301                     }
302             }
303             delay(10000)
304             messagePrioritizationConsumer.shutDown()
305         }
306     }
307
308     /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
309      *  Start :
310      *  nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
311      * */
312     // @Test
313     fun testNatsMessagePrioritizationConsumer() {
314         runBlocking {
315             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
316             assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
317
318             val inputSubject =
319                 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
320
321             val natsMessagePrioritizationService =
322                 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
323             natsMessagePrioritizationService.setConfiguration(configuration)
324
325             val messagePrioritizationConsumer =
326                 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
327             messagePrioritizationConsumer.startConsuming()
328
329             /** Send sample message with every 1 sec */
330             val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
331
332             launch {
333                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
334                     delay(100)
335                     bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
336                 }
337
338                 MessagePrioritizationSample
339                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
340                     .forEach {
341                         delay(100)
342                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
343                     }
344
345                 MessagePrioritizationSample
346                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
347                     .forEach {
348                         delay(200)
349                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
350                     }
351             }
352             delay(3000)
353             messagePrioritizationConsumer.shutDown()
354         }
355     }
356 }