Prioritization Optional NATS consumer support
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumerTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
18
19 import io.mockk.coEvery
20 import io.mockk.every
21 import io.mockk.mockk
22 import io.mockk.spyk
23 import kotlinx.coroutines.delay
24 import kotlinx.coroutines.launch
25 import kotlinx.coroutines.runBlocking
26 import org.junit.Before
27 import org.junit.runner.RunWith
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
37 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
38 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
39 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
40 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
42 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
43 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
44 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
45 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
46 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
47 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
48 import org.onap.ccsdk.cds.controllerblueprints.core.logger
49 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
50 import org.springframework.beans.factory.annotation.Autowired
51 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
52 import org.springframework.context.ApplicationContext
53 import org.springframework.test.annotation.DirtiesContext
54 import org.springframework.test.context.ContextConfiguration
55 import org.springframework.test.context.TestPropertySource
56 import org.springframework.test.context.junit4.SpringRunner
57 import kotlin.test.Test
58 import kotlin.test.assertNotNull
59
60 @RunWith(SpringRunner::class)
61 @DataJpaTest
62 @DirtiesContext
63 @ContextConfiguration(
64     classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
65         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
66         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
67 )
68 @TestPropertySource(
69     properties =
70     [
71         "spring.jpa.show-sql=false",
72         "spring.jpa.properties.hibernate.show_sql=false",
73         "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
74
75         "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
76         "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
77         "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
78         "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
79
80         // To send initial test message
81         "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
82         "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
83         "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
84
85         "blueprintsprocessor.nats.cds-controller.type=token-auth",
86         "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
87         "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
88     ]
89 )
90 open class MessagePrioritizationConsumerTest {
91
92     private val log = logger(MessagePrioritizationConsumerTest::class)
93
94     @Autowired
95     lateinit var applicationContext: ApplicationContext
96
97     @Autowired
98     lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
99
100     @Autowired
101     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
102
103     @Autowired
104     lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
105
106     @Autowired
107     lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
108
109     @Before
110     fun setup() {
111         BluePrintDependencyService.inject(applicationContext)
112     }
113
114     @Test
115     fun testBluePrintKafkaJDBCKeyStore() {
116         runBlocking {
117             assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
118
119             val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
120                 .instance(MessagePrioritizationStateService::class)
121             assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
122
123             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
124                 val message = messagePrioritizationService.saveMessage(it)
125                 val repoResult = messagePrioritizationService.getMessage(message.id)
126                 assertNotNull(repoResult, "failed to get inserted message.")
127             }
128         }
129     }
130
131     @Test
132     fun testMessagePrioritizationService() {
133         runBlocking {
134             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
135             val messagePrioritizationService =
136                 SampleMessagePrioritizationService(messagePrioritizationStateService)
137             messagePrioritizationService.setConfiguration(configuration)
138
139             log.info("****************  without Correlation **************")
140             /** Checking without correlation */
141             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
142                 messagePrioritizationService.prioritize(it)
143             }
144             log.info("****************  Same Group , with Correlation **************")
145             /** checking same group with correlation */
146             MessagePrioritizationSample
147                 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
148                 .forEach {
149                     delay(10)
150                     messagePrioritizationService.prioritize(it)
151                 }
152             log.info("****************  Different Type , with Correlation **************")
153             /** checking different type, with correlation */
154             MessagePrioritizationSample
155                 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
156                 .forEach {
157                     delay(10)
158                     messagePrioritizationService.prioritize(it)
159                 }
160         }
161     }
162
163     @Test
164     fun testStartConsuming() {
165         runBlocking {
166             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
167
168             val streamingConsumerService = bluePrintMessageLibPropertyService
169                 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
170             assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
171
172             val spyStreamingConsumerService = spyk(streamingConsumerService)
173             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
174             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
175             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
176                 bluePrintMessageLibPropertyService, mockk()
177             )
178             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
179
180             // Test Topology
181             val kafkaStreamConsumerFunction =
182                 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
183             val messageConsumerProperties = bluePrintMessageLibPropertyService
184                 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
185             val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
186             assertNotNull(topology, "failed to get create topology")
187
188             every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
189             spyMessagePrioritizationConsumer.startConsuming(configuration)
190             spyMessagePrioritizationConsumer.shutDown()
191         }
192     }
193
194     @Test
195     fun testSchedulerService() {
196         runBlocking {
197             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
198             val messagePrioritizationService =
199                 SampleMessagePrioritizationService(messagePrioritizationStateService)
200             messagePrioritizationService.setConfiguration(configuration)
201
202             val messagePrioritizationSchedulerService =
203                 MessagePrioritizationSchedulerService(messagePrioritizationService)
204             launch {
205                 messagePrioritizationSchedulerService.startScheduling()
206             }
207             launch {
208                 /** To debug increase the delay time */
209                 delay(20)
210                 messagePrioritizationSchedulerService.shutdownScheduling(configuration)
211             }
212         }
213     }
214
215     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
216     // @Test
217     fun testKafkaMessagePrioritizationConsumer() {
218         runBlocking {
219
220             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
221             val kafkaMessagePrioritizationService =
222                 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
223             kafkaMessagePrioritizationService.setConfiguration(configuration)
224
225             val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
226                 messagePrioritizationStateService,
227                 kafkaMessagePrioritizationService
228             )
229
230             // Register the processor
231             BluePrintDependencyService.registerSingleton(
232                 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
233                 defaultMessagePrioritizeProcessor
234             )
235
236             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
237                 bluePrintMessageLibPropertyService,
238                 kafkaMessagePrioritizationService
239             )
240             messagePrioritizationConsumer.startConsuming(configuration)
241
242             /** Send sample message with every 1 sec */
243             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
244                 .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
245             launch {
246                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
247                     delay(100)
248                     val headers: MutableMap<String, String> = hashMapOf()
249                     headers["id"] = it.id
250                     blueprintMessageProducerService.sendMessageNB(
251                         message = it.asJsonString(false),
252                         headers = headers
253                     )
254                 }
255
256                 MessagePrioritizationSample
257                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
258                     .forEach {
259                         delay(100)
260                         val headers: MutableMap<String, String> = hashMapOf()
261                         headers["id"] = it.id
262                         blueprintMessageProducerService.sendMessageNB(
263                             message = it.asJsonString(false),
264                             headers = headers
265                         )
266                     }
267
268                 MessagePrioritizationSample
269                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
270                     .forEach {
271                         delay(2000)
272                         val headers: MutableMap<String, String> = hashMapOf()
273                         headers["id"] = it.id
274                         blueprintMessageProducerService.sendMessageNB(
275                             message = it.asJsonString(false),
276                             headers = headers
277                         )
278                     }
279             }
280             delay(10000)
281             messagePrioritizationConsumer.shutDown()
282         }
283     }
284
285     /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
286      *  Start :
287      *  nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
288      * */
289     // @Test
290     fun testNatsMessagePrioritizationConsumer() {
291         runBlocking {
292             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
293             assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
294
295             val inputSubject =
296                 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
297
298             val natsMessagePrioritizationService =
299                 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
300             natsMessagePrioritizationService.setConfiguration(configuration)
301
302             val messagePrioritizationConsumer =
303                 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
304             messagePrioritizationConsumer.startConsuming()
305
306             /** Send sample message with every 1 sec */
307             val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
308
309             launch {
310                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
311                     delay(100)
312                     bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
313                 }
314
315                 MessagePrioritizationSample
316                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
317                     .forEach {
318                         delay(100)
319                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
320                     }
321
322                 MessagePrioritizationSample
323                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
324                     .forEach {
325                         delay(200)
326                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
327                     }
328             }
329             delay(3000)
330             messagePrioritizationConsumer.shutDown()
331         }
332     }
333 }