Enable JUnit tests and porting to java 17
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritization / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumerTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  * Modifications Copyright © 2021 Bell Canada.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
19
20 import io.micrometer.core.instrument.MeterRegistry
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.runBlocking
24 import org.junit.Before
25 import org.junit.runner.RunWith
26 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
28 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
33 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
34 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
35 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
37 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
38 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
39 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
40 import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
41 import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
42 import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
43 import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
44 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
45 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
46 import org.onap.ccsdk.cds.controllerblueprints.core.logger
47 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
48 import org.springframework.beans.factory.annotation.Autowired
49 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
50 import org.springframework.boot.test.mock.mockito.MockBean
51 import org.springframework.context.ApplicationContext
52 import org.springframework.test.annotation.DirtiesContext
53 import org.springframework.test.context.ContextConfiguration
54 import org.springframework.test.context.TestPropertySource
55 import org.springframework.test.context.junit4.SpringRunner
56 import kotlin.test.Test
57 import kotlin.test.assertNotNull
58
59 @RunWith(SpringRunner::class)
60 @DataJpaTest
61 @DirtiesContext
62 @ContextConfiguration(
63     classes = [
64         BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
65         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
66         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class
67     ]
68 )
69 @TestPropertySource(
70     properties =
71         [
72             "spring.jpa.show-sql=false",
73             "spring.jpa.properties.hibernate.show_sql=false",
74             "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
75
76             "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
77             "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
78             "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
79             "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
80             "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
81             "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
82             "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
83             "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
84             "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
85             "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
86
87             // To send initial test message
88             "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
89             "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
90             "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
91             "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
92             "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
93             "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
94             "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
95             "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
96             "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
97
98             "blueprintsprocessor.nats.cds-controller.type=token-auth",
99             "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
100             "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
101         ]
102 )
103 open class MessagePrioritizationConsumerTest {
104
105     private val log = logger(MessagePrioritizationConsumerTest::class)
106
107     @Autowired
108     lateinit var applicationContext: ApplicationContext
109
110     @Autowired
111     lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
112
113     @Autowired
114     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
115
116     @Autowired
117     lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
118
119     @Autowired
120     lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
121
122     @MockBean
123     lateinit var meterRegistry: MeterRegistry
124
125     @Before
126     fun setup() {
127         BluePrintDependencyService.inject(applicationContext)
128     }
129
130     @Test
131     fun testBluePrintKafkaJDBCKeyStore() {
132         runBlocking {
133             assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
134
135             val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
136                 .instance(MessagePrioritizationStateService::class)
137             assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
138
139             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
140                 val message = messagePrioritizationService.saveMessage(it)
141                 val repoResult = messagePrioritizationService.getMessage(message.id)
142                 assertNotNull(repoResult, "failed to get inserted message.")
143             }
144         }
145     }
146
147     @Test
148     fun testMessagePrioritizationService() {
149         runBlocking {
150             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
151             val messagePrioritizationService =
152                 SampleMessagePrioritizationService(messagePrioritizationStateService)
153             messagePrioritizationService.setConfiguration(configuration)
154
155             log.info("****************  without Correlation **************")
156             /** Checking without correlation */
157             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
158                 messagePrioritizationService.prioritize(it)
159             }
160             log.info("****************  Same Group , with Correlation **************")
161             /** checking same group with correlation */
162             MessagePrioritizationSample
163                 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
164                 .forEach {
165                     delay(10)
166                     messagePrioritizationService.prioritize(it)
167                 }
168             log.info("****************  Different Type , with Correlation **************")
169             /** checking different type, with correlation */
170             MessagePrioritizationSample
171                 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
172                 .forEach {
173                     delay(10)
174                     messagePrioritizationService.prioritize(it)
175                 }
176         }
177     }
178
179     @Test
180     fun testSchedulerService() {
181         runBlocking {
182             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
183             val messagePrioritizationService =
184                 SampleMessagePrioritizationService(messagePrioritizationStateService)
185             messagePrioritizationService.setConfiguration(configuration)
186
187             val messagePrioritizationSchedulerService =
188                 MessagePrioritizationSchedulerService(messagePrioritizationService)
189             launch {
190                 messagePrioritizationSchedulerService.startScheduling()
191             }
192             launch {
193                 /** To debug increase the delay time */
194                 delay(20)
195                 messagePrioritizationSchedulerService.shutdownScheduling()
196             }
197         }
198     }
199
200     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
201     // @Test
202     fun testKafkaMessagePrioritizationConsumer() {
203         runBlocking {
204
205             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
206             val kafkaMessagePrioritizationService =
207                 SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
208             kafkaMessagePrioritizationService.setConfiguration(configuration)
209
210             val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
211                 messagePrioritizationStateService,
212                 kafkaMessagePrioritizationService
213             )
214
215             // Register the processor
216             BluePrintDependencyService.registerSingleton(
217                 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
218                 defaultMessagePrioritizeProcessor
219             )
220
221             val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
222                 bluePrintMessageLibPropertyService,
223                 kafkaMessagePrioritizationService
224             )
225             messagePrioritizationConsumer.startConsuming(configuration)
226
227             /** Send sample message with every 1 sec */
228             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
229                 .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
230             launch {
231                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
232                     delay(100)
233                     val headers: MutableMap<String, String> = hashMapOf()
234                     headers["id"] = it.id
235                     blueprintMessageProducerService.sendMessageNB(
236                         key = "mykey",
237                         message = it.asJsonString(false),
238                         headers = headers
239                     )
240                 }
241
242                 MessagePrioritizationSample
243                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
244                     .forEach {
245                         delay(100)
246                         val headers: MutableMap<String, String> = hashMapOf()
247                         headers["id"] = it.id
248                         blueprintMessageProducerService.sendMessageNB(
249                             key = "mykey",
250                             message = it.asJsonString(false),
251                             headers = headers
252                         )
253                     }
254
255                 MessagePrioritizationSample
256                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
257                     .forEach {
258                         delay(2000)
259                         val headers: MutableMap<String, String> = hashMapOf()
260                         headers["id"] = it.id
261                         blueprintMessageProducerService.sendMessageNB(
262                             key = "mykey",
263                             message = it.asJsonString(false),
264                             headers = headers
265                         )
266                     }
267             }
268             delay(10000)
269             messagePrioritizationConsumer.shutDown()
270         }
271     }
272
273     /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
274      *  Start :
275      *  nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
276      * */
277     // @Test
278     fun testNatsMessagePrioritizationConsumer() {
279         runBlocking {
280             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
281             assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
282
283             val inputSubject =
284                 NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
285
286             val natsMessagePrioritizationService =
287                 SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
288             natsMessagePrioritizationService.setConfiguration(configuration)
289
290             val messagePrioritizationConsumer =
291                 NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
292             messagePrioritizationConsumer.startConsuming()
293
294             /** Send sample message with every 1 sec */
295             val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
296
297             launch {
298                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
299                     delay(100)
300                     bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
301                 }
302
303                 MessagePrioritizationSample
304                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
305                     .forEach {
306                         delay(100)
307                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
308                     }
309
310                 MessagePrioritizationSample
311                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
312                     .forEach {
313                         delay(200)
314                         bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
315                     }
316             }
317             delay(3000)
318             messagePrioritizationConsumer.shutDown()
319         }
320     }
321 }