af8d902cd41cc1d0adbc1f5d0fd7249f03934405
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumerTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
18
19 import io.mockk.coEvery
20 import io.mockk.every
21 import io.mockk.mockk
22 import io.mockk.spyk
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.junit.Before
27 import org.junit.runner.RunWith
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
39 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
40 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
42 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
43 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
44 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
45 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
46 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
47 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
50 import org.springframework.beans.factory.annotation.Autowired
51 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
52 import org.springframework.context.ApplicationContext
53 import org.springframework.test.annotation.DirtiesContext
54 import org.springframework.test.context.ContextConfiguration
55 import org.springframework.test.context.TestPropertySource
56 import org.springframework.test.context.junit4.SpringRunner
57 import kotlin.test.Test
58 import kotlin.test.assertNotNull
59
60 @RunWith(SpringRunner::class)
61 @DataJpaTest
62 @DirtiesContext
63 @ContextConfiguration(
64     classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
65         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
66         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
67 )
68 @TestPropertySource(
69     properties =
70     [
71         "spring.jpa.show-sql=false",
72         "spring.jpa.properties.hibernate.show_sql=false",
73         "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
74
75         "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
76         "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
77         "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
78         "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
79         "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
80         "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
81         "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
82         "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
83         "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
84         "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
85
86         // To send initial test message
87         "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
88         "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
89         "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
90         "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
91         "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
92         "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
93         "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
94         "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
95         "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
96
97         "blueprintsprocessor.nats.cds-controller.type=token-auth",
98         "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
99         "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
100     ]
101 )
102 open class MessagePrioritizationConsumerTest {
103
104     private val log = logger(MessagePrioritizationConsumerTest::class)
105
106     @Autowired
107     lateinit var applicationContext: ApplicationContext
108
109     @Autowired
110     lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
111
112     @Autowired
113     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
114
115     @Autowired
116     lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
117
118     @Autowired
119     lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
120
121     @Before
122     fun setup() {
123         BluePrintDependencyService.inject(applicationContext)
124     }
125
126     @Test
127     fun testBluePrintKafkaJDBCKeyStore() {
128         runBlocking {
129             assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
130
131             val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
132                 .instance(MessagePrioritizationStateService::class)
133             assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
134
135             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
136                 val message = messagePrioritizationService.saveMessage(it)
137                 val repoResult = messagePrioritizationService.getMessage(message.id)
138                 assertNotNull(repoResult, "failed to get inserted message.")
139             }
140         }
141     }
142
143     @Test
144     fun testMessagePrioritizationService() {
145         runBlocking {
146             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
147             val messagePrioritizationService =
148                 SampleMessagePrioritizationService(messagePrioritizationStateService)
149             messagePrioritizationService.setConfiguration(configuration)
150
151             log.info("****************  without Correlation **************")
152             /** Checking without correlation */
153             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
154                 messagePrioritizationService.prioritize(it)
155             }
156             log.info("****************  Same Group , with Correlation **************")
157             /** checking same group with correlation */
158             MessagePrioritizationSample
159                 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
160                 .forEach {
161                     delay(10)
162                     messagePrioritizationService.prioritize(it)
163                 }
164             log.info("****************  Different Type , with Correlation **************")
165             /** checking different type, with correlation */
166             MessagePrioritizationSample
167                 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
168                 .forEach {
169                     delay(10)
170                     messagePrioritizationService.prioritize(it)
171                 }
172         }
173     }
174
175     @Test
176     fun testStartConsuming() {
177         runBlocking {
178             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
179
180             val streamingConsumerService = bluePrintMessageLibPropertyService
181                 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
182             assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
183
184             val spyStreamingConsumerService = spyk(streamingConsumerService)
185             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
186             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
187             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
188                 bluePrintMessageLibPropertyService, mockk()
189             )
190             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
191
192             // Test Topology
193             val kafkaStreamConsumerFunction =
194                 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
195             val messageConsumerProperties = bluePrintMessageLibPropertyService
196                 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
197             val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
198             assertNotNull(topology, "failed to get create topology")
199
200             every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
201             spyMessagePrioritizationConsumer.startConsuming(configuration)
202             spyMessagePrioritizationConsumer.shutDown()
203         }
204     }
205
206     @Test
207     fun testSchedulerService() {
208         runBlocking {
209             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
210             val messagePrioritizationService =
211                 SampleMessagePrioritizationService(messagePrioritizationStateService)
212             messagePrioritizationService.setConfiguration(configuration)
213
214             val messagePrioritizationSchedulerService =
215                 MessagePrioritizationSchedulerService(messagePrioritizationService)
216             launch {
217                 messagePrioritizationSchedulerService.startScheduling()
218             }
219             launch {
220                 /** To debug increase the delay time */
221                 delay(20)
222                 messagePrioritizationSchedulerService.shutdownScheduling()
223             }
224         }
225     }
226
227     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
228     // @Test
229     fun testKafkaMessagePrioritizationConsumer() {
230         runBlocking {
231
232             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
233             val kafkaMessagePrioritizationService =
234                 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
235             kafkaMessagePrioritizationService.setConfiguration(configuration)
236
237             val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
238                 messagePrioritizationStateService,
239                 kafkaMessagePrioritizationService
240             )
241
242             // Register the processor
243             BluePrintDependencyService.registerSingleton(
244                 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
245                 defaultMessagePrioritizeProcessor
246             )
247
248             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
249                 bluePrintMessageLibPropertyService,
250                 kafkaMessagePrioritizationService
251             )
252             messagePrioritizationConsumer.startConsuming(configuration)
253
254             /** Send sample message with every 1 sec */
255             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
256                 .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
257             launch {
258                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
259                     delay(100)
260                     val headers: MutableMap<String, String> = hashMapOf()
261                     headers["id"] = it.id
262                     blueprintMessageProducerService.sendMessageNB(
263                         key = "mykey",
264                         message = it.asJsonString(false),
265                         headers = headers
266                     )
267                 }
268
269                 MessagePrioritizationSample
270                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
271                     .forEach {
272                         delay(100)
273                         val headers: MutableMap<String, String> = hashMapOf()
274                         headers["id"] = it.id
275                         blueprintMessageProducerService.sendMessageNB(
276                             key = "mykey",
277                             message = it.asJsonString(false),
278                             headers = headers
279                         )
280                     }
281
282                 MessagePrioritizationSample
283                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
284                     .forEach {
285                         delay(2000)
286                         val headers: MutableMap<String, String> = hashMapOf()
287                         headers["id"] = it.id
288                         blueprintMessageProducerService.sendMessageNB(
289                             key = "mykey",
290                             message = it.asJsonString(false),
291                             headers = headers
292                         )
293                     }
294             }
295             delay(10000)
296             messagePrioritizationConsumer.shutDown()
297         }
298     }
299
300     /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
301      *  Start :
302      *  nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
303      * */
304     // @Test
305     fun testNatsMessagePrioritizationConsumer() {
306         runBlocking {
307             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
308             assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
309
310             val inputSubject =
311                 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
312
313             val natsMessagePrioritizationService =
314                 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
315             natsMessagePrioritizationService.setConfiguration(configuration)
316
317             val messagePrioritizationConsumer =
318                 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
319             messagePrioritizationConsumer.startConsuming()
320
321             /** Send sample message with every 1 sec */
322             val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
323
324             launch {
325                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
326                     delay(100)
327                     bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
328                 }
329
330                 MessagePrioritizationSample
331                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
332                     .forEach {
333                         delay(100)
334                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
335                     }
336
337                 MessagePrioritizationSample
338                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
339                     .forEach {
340                         delay(200)
341                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
342                     }
343             }
344             delay(3000)
345             messagePrioritizationConsumer.shutDown()
346         }
347     }
348 }