7e6bf68be3ca3b2579d128381d916d9faaf72487
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
18
19 import io.mockk.coEvery
20 import io.mockk.every
21 import io.mockk.mockk
22 import io.mockk.spyk
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.junit.Before
27 import org.junit.runner.RunWith
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
39 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
40 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
42 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
43 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
44 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
45 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
46 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
47 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
50 import org.springframework.beans.factory.annotation.Autowired
51 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
52 import org.springframework.context.ApplicationContext
53 import org.springframework.test.annotation.DirtiesContext
54 import org.springframework.test.context.ContextConfiguration
55 import org.springframework.test.context.TestPropertySource
56 import org.springframework.test.context.junit4.SpringRunner
57 import kotlin.test.Test
58 import kotlin.test.assertNotNull
59
60 @RunWith(SpringRunner::class)
61 @DataJpaTest
62 @DirtiesContext
63 @ContextConfiguration(
64     classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
65         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
66         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
67 )
68 @TestPropertySource(
69     properties =
70     [
71         "spring.jpa.show-sql=false",
72         "spring.jpa.properties.hibernate.show_sql=false",
73         "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
74
75         "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
76         "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
77         "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
78         "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
79         "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
80         "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
81         "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
82         "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
83         "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
84         "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
85
86         // To send initial test message
87         "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
88         "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
89         "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
90         "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
91         "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
92         "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
93         "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
94         "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
95         "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
96
97         "blueprintsprocessor.nats.cds-controller.type=token-auth",
98         "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
99         "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
100     ]
101 )
102 open class MessagePrioritizationConsumerTest {
103
104     private val log = logger(MessagePrioritizationConsumerTest::class)
105
106     @Autowired
107     lateinit var applicationContext: ApplicationContext
108
109     @Autowired
110     lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
111
112     @Autowired
113     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
114
115     @Autowired
116     lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
117
118     @Autowired
119     lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
120
121     @Before
122     fun setup() {
123         BluePrintDependencyService.inject(applicationContext)
124     }
125
126     @Test
127     fun testBluePrintKafkaJDBCKeyStore() {
128         runBlocking {
129             assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
130
131             val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
132                 .instance(MessagePrioritizationStateService::class)
133             assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
134
135             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
136                 val message = messagePrioritizationService.saveMessage(it)
137                 val repoResult = messagePrioritizationService.getMessage(message.id)
138                 assertNotNull(repoResult, "failed to get inserted message.")
139             }
140         }
141     }
142
143     @Test
144     fun testMessagePrioritizationService() {
145         runBlocking {
146             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
147             val messagePrioritizationService =
148                 SampleMessagePrioritizationService(messagePrioritizationStateService)
149             messagePrioritizationService.setConfiguration(configuration)
150
151             log.info("****************  without Correlation **************")
152             /** Checking without correlation */
153             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
154                 messagePrioritizationService.prioritize(it)
155             }
156             log.info("****************  Same Group , with Correlation **************")
157             /** checking same group with correlation */
158             MessagePrioritizationSample
159                 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
160                 .forEach {
161                     delay(10)
162                     messagePrioritizationService.prioritize(it)
163                 }
164             log.info("****************  Different Type , with Correlation **************")
165             /** checking different type, with correlation */
166             MessagePrioritizationSample
167                 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
168                 .forEach {
169                     delay(10)
170                     messagePrioritizationService.prioritize(it)
171                 }
172         }
173     }
174
175     @Test
176     fun testStartConsuming() {
177         runBlocking {
178             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
179
180             val streamingConsumerService = bluePrintMessageLibPropertyService
181                 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
182             assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
183
184             val spyStreamingConsumerService = spyk(streamingConsumerService)
185             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
186             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
187             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
188                 bluePrintMessageLibPropertyService, mockk()
189             )
190             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
191
192             // Test Topology
193             val kafkaStreamConsumerFunction =
194                 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
195             val messageConsumerProperties = bluePrintMessageLibPropertyService
196                 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
197             val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
198             assertNotNull(topology, "failed to get create topology")
199
200             every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
201             spyMessagePrioritizationConsumer.startConsuming(configuration)
202             spyMessagePrioritizationConsumer.shutDown()
203         }
204     }
205
206     @Test
207     fun testSchedulerService() {
208         runBlocking {
209             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
210             val messagePrioritizationService =
211                 SampleMessagePrioritizationService(messagePrioritizationStateService)
212             messagePrioritizationService.setConfiguration(configuration)
213
214             val messagePrioritizationSchedulerService =
215                 MessagePrioritizationSchedulerService(messagePrioritizationService)
216             launch {
217                 messagePrioritizationSchedulerService.startScheduling()
218             }
219             launch {
220                 /** To debug increase the delay time */
221                 delay(20)
222                 messagePrioritizationSchedulerService.shutdownScheduling()
223             }
224         }
225     }
226
227     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
228     // @Test
229     fun testKafkaMessagePrioritizationConsumer() {
230         runBlocking {
231
232             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
233             val kafkaMessagePrioritizationService =
234                 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
235             kafkaMessagePrioritizationService.setConfiguration(configuration)
236
237             val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
238                 messagePrioritizationStateService,
239                 kafkaMessagePrioritizationService
240             )
241
242             // Register the processor
243             BluePrintDependencyService.registerSingleton(
244                 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
245                 defaultMessagePrioritizeProcessor
246             )
247
248             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
249                 bluePrintMessageLibPropertyService,
250                 kafkaMessagePrioritizationService
251             )
252             messagePrioritizationConsumer.startConsuming(configuration)
253
254             /** Send sample message with every 1 sec */
255             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
256                 .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
257             launch {
258                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
259                     delay(100)
260                     val headers: MutableMap<String, String> = hashMapOf()
261                     headers["id"] = it.id
262                     blueprintMessageProducerService.sendMessageNB(
263                         message = it.asJsonString(false),
264                         headers = headers
265                     )
266                 }
267
268                 MessagePrioritizationSample
269                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
270                     .forEach {
271                         delay(100)
272                         val headers: MutableMap<String, String> = hashMapOf()
273                         headers["id"] = it.id
274                         blueprintMessageProducerService.sendMessageNB(
275                             message = it.asJsonString(false),
276                             headers = headers
277                         )
278                     }
279
280                 MessagePrioritizationSample
281                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
282                     .forEach {
283                         delay(2000)
284                         val headers: MutableMap<String, String> = hashMapOf()
285                         headers["id"] = it.id
286                         blueprintMessageProducerService.sendMessageNB(
287                             message = it.asJsonString(false),
288                             headers = headers
289                         )
290                     }
291             }
292             delay(10000)
293             messagePrioritizationConsumer.shutDown()
294         }
295     }
296
297     /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
298      *  Start :
299      *  nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
300      * */
301     // @Test
302     fun testNatsMessagePrioritizationConsumer() {
303         runBlocking {
304             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
305             assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
306
307             val inputSubject =
308                 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
309
310             val natsMessagePrioritizationService =
311                 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
312             natsMessagePrioritizationService.setConfiguration(configuration)
313
314             val messagePrioritizationConsumer =
315                 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
316             messagePrioritizationConsumer.startConsuming()
317
318             /** Send sample message with every 1 sec */
319             val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
320
321             launch {
322                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
323                     delay(100)
324                     bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
325                 }
326
327                 MessagePrioritizationSample
328                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
329                     .forEach {
330                         delay(100)
331                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
332                     }
333
334                 MessagePrioritizationSample
335                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
336                     .forEach {
337                         delay(200)
338                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
339                     }
340             }
341             delay(3000)
342             messagePrioritizationConsumer.shutDown()
343         }
344     }
345 }