Renaming Files having BluePrint to have Blueprint
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritization / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumerTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
18
19 import io.mockk.coEvery
20 import io.mockk.every
21 import io.mockk.mockk
22 import io.mockk.spyk
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.junit.Before
27 import org.junit.runner.RunWith
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
39 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfiguration
40 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
42 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BlueprintNatsLibConfiguration
43 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BlueprintNatsLibPropertyService
44 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
45 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
46 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
47 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintDependencyService
50 import org.springframework.beans.factory.annotation.Autowired
51 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
52 import org.springframework.context.ApplicationContext
53 import org.springframework.test.annotation.DirtiesContext
54 import org.springframework.test.context.ContextConfiguration
55 import org.springframework.test.context.TestPropertySource
56 import org.springframework.test.context.junit4.SpringRunner
57 import kotlin.test.Test
58 import kotlin.test.assertNotNull
59
60 @RunWith(SpringRunner::class)
61 @DataJpaTest
62 @DirtiesContext
63 @ContextConfiguration(
64     classes = [
65         BlueprintMessageLibConfiguration::class, BlueprintNatsLibConfiguration::class,
66         BlueprintPropertyConfiguration::class, BlueprintPropertiesService::class,
67         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class
68     ]
69 )
70 @TestPropertySource(
71     properties =
72         [
73             "spring.jpa.show-sql=false",
74             "spring.jpa.properties.hibernate.show_sql=false",
75             "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
76
77             "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
78             "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
79             "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
80             "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
81             "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
82             "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
83             "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
84             "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
85             "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
86             "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
87
88             // To send initial test message
89             "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
90             "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
91             "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
92             "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
93             "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
94             "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
95             "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
96             "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
97             "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
98
99             "blueprintsprocessor.nats.cds-controller.type=token-auth",
100             "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
101             "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
102         ]
103 )
104 open class MessagePrioritizationConsumerTest {
105
106     private val log = logger(MessagePrioritizationConsumerTest::class)
107
108     @Autowired
109     lateinit var applicationContext: ApplicationContext
110
111     @Autowired
112     lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
113
114     @Autowired
115     lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
116
117     @Autowired
118     lateinit var bluePrintNatsLibPropertyService: BlueprintNatsLibPropertyService
119
120     @Autowired
121     lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
122
123     @Before
124     fun setup() {
125         BlueprintDependencyService.inject(applicationContext)
126     }
127
128     @Test
129     fun testBlueprintKafkaJDBCKeyStore() {
130         runBlocking {
131             assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
132
133             val messagePrioritizationService: MessagePrioritizationStateService = BlueprintDependencyService
134                 .instance(MessagePrioritizationStateService::class)
135             assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
136
137             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
138                 val message = messagePrioritizationService.saveMessage(it)
139                 val repoResult = messagePrioritizationService.getMessage(message.id)
140                 assertNotNull(repoResult, "failed to get inserted message.")
141             }
142         }
143     }
144
145     @Test
146     fun testMessagePrioritizationService() {
147         runBlocking {
148             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
149             val messagePrioritizationService =
150                 SampleMessagePrioritizationService(messagePrioritizationStateService)
151             messagePrioritizationService.setConfiguration(configuration)
152
153             log.info("****************  without Correlation **************")
154             /** Checking without correlation */
155             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
156                 messagePrioritizationService.prioritize(it)
157             }
158             log.info("****************  Same Group , with Correlation **************")
159             /** checking same group with correlation */
160             MessagePrioritizationSample
161                 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
162                 .forEach {
163                     delay(10)
164                     messagePrioritizationService.prioritize(it)
165                 }
166             log.info("****************  Different Type , with Correlation **************")
167             /** checking different type, with correlation */
168             MessagePrioritizationSample
169                 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
170                 .forEach {
171                     delay(10)
172                     messagePrioritizationService.prioritize(it)
173                 }
174         }
175     }
176
177     @Test
178     fun testStartConsuming() {
179         runBlocking {
180             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
181
182             val streamingConsumerService = bluePrintMessageLibPropertyService
183                 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
184             assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
185
186             val spyStreamingConsumerService = spyk(streamingConsumerService)
187             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
188             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
189             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
190                 bluePrintMessageLibPropertyService, mockk()
191             )
192             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
193
194             // Test Topology
195             val kafkaStreamConsumerFunction =
196                 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
197             val messageConsumerProperties = bluePrintMessageLibPropertyService
198                 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
199             val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
200             assertNotNull(topology, "failed to get create topology")
201
202             every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
203             spyMessagePrioritizationConsumer.startConsuming(configuration)
204             spyMessagePrioritizationConsumer.shutDown()
205         }
206     }
207
208     @Test
209     fun testSchedulerService() {
210         runBlocking {
211             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
212             val messagePrioritizationService =
213                 SampleMessagePrioritizationService(messagePrioritizationStateService)
214             messagePrioritizationService.setConfiguration(configuration)
215
216             val messagePrioritizationSchedulerService =
217                 MessagePrioritizationSchedulerService(messagePrioritizationService)
218             launch {
219                 messagePrioritizationSchedulerService.startScheduling()
220             }
221             launch {
222                 /** To debug increase the delay time */
223                 delay(20)
224                 messagePrioritizationSchedulerService.shutdownScheduling()
225             }
226         }
227     }
228
229     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
230     // @Test
231     fun testKafkaMessagePrioritizationConsumer() {
232         runBlocking {
233
234             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
235             val kafkaMessagePrioritizationService =
236                 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
237             kafkaMessagePrioritizationService.setConfiguration(configuration)
238
239             val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
240                 messagePrioritizationStateService,
241                 kafkaMessagePrioritizationService
242             )
243
244             // Register the processor
245             BlueprintDependencyService.registerSingleton(
246                 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
247                 defaultMessagePrioritizeProcessor
248             )
249
250             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
251                 bluePrintMessageLibPropertyService,
252                 kafkaMessagePrioritizationService
253             )
254             messagePrioritizationConsumer.startConsuming(configuration)
255
256             /** Send sample message with every 1 sec */
257             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
258                 .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
259             launch {
260                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
261                     delay(100)
262                     val headers: MutableMap<String, String> = hashMapOf()
263                     headers["id"] = it.id
264                     blueprintMessageProducerService.sendMessageNB(
265                         key = "mykey",
266                         message = it.asJsonString(false),
267                         headers = headers
268                     )
269                 }
270
271                 MessagePrioritizationSample
272                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
273                     .forEach {
274                         delay(100)
275                         val headers: MutableMap<String, String> = hashMapOf()
276                         headers["id"] = it.id
277                         blueprintMessageProducerService.sendMessageNB(
278                             key = "mykey",
279                             message = it.asJsonString(false),
280                             headers = headers
281                         )
282                     }
283
284                 MessagePrioritizationSample
285                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
286                     .forEach {
287                         delay(2000)
288                         val headers: MutableMap<String, String> = hashMapOf()
289                         headers["id"] = it.id
290                         blueprintMessageProducerService.sendMessageNB(
291                             key = "mykey",
292                             message = it.asJsonString(false),
293                             headers = headers
294                         )
295                     }
296             }
297             delay(10000)
298             messagePrioritizationConsumer.shutDown()
299         }
300     }
301
302     /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
303      *  Start :
304      *  nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
305      * */
306     // @Test
307     fun testNatsMessagePrioritizationConsumer() {
308         runBlocking {
309             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
310             assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
311
312             val inputSubject =
313                 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
314
315             val natsMessagePrioritizationService =
316                 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
317             natsMessagePrioritizationService.setConfiguration(configuration)
318
319             val messagePrioritizationConsumer =
320                 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
321             messagePrioritizationConsumer.startConsuming()
322
323             /** Send sample message with every 1 sec */
324             val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
325
326             launch {
327                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
328                     delay(100)
329                     bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
330                 }
331
332                 MessagePrioritizationSample
333                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
334                     .forEach {
335                         delay(100)
336                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
337                     }
338
339                 MessagePrioritizationSample
340                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
341                     .forEach {
342                         delay(200)
343                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
344                     }
345             }
346             delay(3000)
347             messagePrioritizationConsumer.shutDown()
348         }
349     }
350 }