Prioritization expiry and clean scheduler service
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumerTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
18
19 import io.mockk.coEvery
20 import io.mockk.every
21 import io.mockk.spyk
22 import kotlinx.coroutines.delay
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.junit.Before
26 import org.junit.runner.RunWith
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
32 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
34 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
35 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
36 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
37 import org.onap.ccsdk.cds.controllerblueprints.core.logger
38 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
39 import org.springframework.beans.factory.annotation.Autowired
40 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
41 import org.springframework.context.ApplicationContext
42 import org.springframework.test.annotation.DirtiesContext
43 import org.springframework.test.context.ContextConfiguration
44 import org.springframework.test.context.TestPropertySource
45 import org.springframework.test.context.junit4.SpringRunner
46 import kotlin.test.Test
47 import kotlin.test.assertNotNull
48 import kotlin.test.assertTrue
49
50 @RunWith(SpringRunner::class)
51 @DataJpaTest
52 @DirtiesContext
53 @ContextConfiguration(
54     classes = [BluePrintMessageLibConfiguration::class,
55         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
56         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
57 )
58 @TestPropertySource(
59     properties =
60     [
61         "spring.jpa.show-sql=true",
62         "spring.jpa.properties.hibernate.show_sql=true",
63         "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
64
65         "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
66         "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
67         "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
68         "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
69
70         // To send initial test message
71         "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
72         "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
73         "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
74     ]
75 )
76 open class MessagePrioritizationConsumerTest {
77
78     private val log = logger(MessagePrioritizationConsumerTest::class)
79
80     @Autowired
81     lateinit var applicationContext: ApplicationContext
82
83     @Autowired
84     lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
85
86     @Autowired
87     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
88
89     @Autowired
90     lateinit var messagePrioritizationService: MessagePrioritizationService
91
92     @Autowired
93     lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService
94
95     @Autowired
96     lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
97
98     @Before
99     fun setup() {
100         BluePrintDependencyService.inject(applicationContext)
101     }
102
103     @Test
104     fun testBluePrintKafkaJDBCKeyStore() {
105         runBlocking {
106             assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
107
108             val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
109                 .instance(MessagePrioritizationStateService::class)
110             assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
111
112             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
113                 val message = messagePrioritizationService.saveMessage(it)
114                 val repoResult = messagePrioritizationService.getMessage(message.id)
115                 assertNotNull(repoResult, "failed to get inserted message.")
116             }
117         }
118     }
119
120     @Test
121     fun testMessagePrioritizationService() {
122         runBlocking {
123             assertTrue(
124                 ::messagePrioritizationService.isInitialized,
125                 "failed to initialize messagePrioritizationService"
126             )
127
128             log.info("****************  without Correlation **************")
129             /** Checking without correlation */
130             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
131                 messagePrioritizationService.prioritize(it)
132             }
133             log.info("****************  Same Group , with Correlation **************")
134             /** checking same group with correlation */
135             MessagePrioritizationSample
136                 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
137                 .forEach {
138                     delay(10)
139                     messagePrioritizationService.prioritize(it)
140                 }
141             log.info("****************  Different Type , with Correlation **************")
142             /** checking different type, with correlation */
143             MessagePrioritizationSample
144                 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
145                 .forEach {
146                     delay(10)
147                     messagePrioritizationService.prioritize(it)
148                 }
149         }
150     }
151
152     @Test
153     fun testStartConsuming() {
154         runBlocking {
155             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
156
157             val streamingConsumerService = bluePrintMessageLibPropertyService
158                 .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
159             assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
160
161             val spyStreamingConsumerService = spyk(streamingConsumerService)
162             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
163             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
164             val messagePrioritizationConsumer = MessagePrioritizationConsumer(
165                 bluePrintMessageLibPropertyService
166             )
167             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
168
169             // Test Topology
170             val kafkaStreamConsumerFunction =
171                 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
172             val messageConsumerProperties = bluePrintMessageLibPropertyService
173                 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
174             val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
175             assertNotNull(topology, "failed to get create topology")
176
177             every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
178             spyMessagePrioritizationConsumer.startConsuming(configuration)
179             spyMessagePrioritizationConsumer.shutDown()
180         }
181     }
182
183     @Test
184     fun testSchedulerService() {
185         runBlocking {
186             val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration()
187             assertTrue(
188                 ::messagePrioritizationSchedulerService.isInitialized,
189                 "failed to initialize messagePrioritizationSchedulerService"
190             )
191             launch {
192                 messagePrioritizationSchedulerService.startScheduling(configuration)
193             }
194             launch {
195                 /** To debug increase the delay time */
196                 delay(20)
197                 messagePrioritizationSchedulerService.shutdownScheduling(configuration)
198             }
199         }
200     }
201
202     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
203     // @Test
204     fun testMessagePrioritizationConsumer() {
205         runBlocking {
206             messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
207
208             /** Send sample message with every 1 sec */
209             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
210                 .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
211             launch {
212                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
213                     delay(100)
214                     val headers: MutableMap<String, String> = hashMapOf()
215                     headers["id"] = it.id
216                     blueprintMessageProducerService.sendMessageNB(
217                         message = it.asJsonString(false),
218                         headers = headers
219                     )
220                 }
221
222                 MessagePrioritizationSample
223                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
224                     .forEach {
225                         delay(100)
226                         val headers: MutableMap<String, String> = hashMapOf()
227                         headers["id"] = it.id
228                         blueprintMessageProducerService.sendMessageNB(
229                             message = it.asJsonString(false),
230                             headers = headers
231                         )
232                     }
233
234                 MessagePrioritizationSample
235                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
236                     .forEach {
237                         delay(2000)
238                         val headers: MutableMap<String, String> = hashMapOf()
239                         headers["id"] = it.id
240                         blueprintMessageProducerService.sendMessageNB(
241                             message = it.asJsonString(false),
242                             headers = headers
243                         )
244                     }
245             }
246             delay(10000)
247             messagePrioritizationConsumer.shutDown()
248         }
249     }
250 }