Rest endpoint for message Prioritization
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumerTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
18
19 import io.mockk.coEvery
20 import io.mockk.every
21 import io.mockk.spyk
22 import kotlinx.coroutines.delay
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.junit.Before
26 import org.junit.runner.RunWith
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
34 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
35 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
36 import org.onap.ccsdk.cds.controllerblueprints.core.logger
37 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
38 import org.springframework.beans.factory.annotation.Autowired
39 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
40 import org.springframework.context.ApplicationContext
41 import org.springframework.test.annotation.DirtiesContext
42 import org.springframework.test.context.ContextConfiguration
43 import org.springframework.test.context.TestPropertySource
44 import org.springframework.test.context.junit4.SpringRunner
45 import kotlin.test.Test
46 import kotlin.test.assertNotNull
47 import kotlin.test.assertTrue
48
49 @RunWith(SpringRunner::class)
50 @DataJpaTest
51 @DirtiesContext
52 @ContextConfiguration(
53     classes = [BluePrintMessageLibConfiguration::class,
54         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
55         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
56 )
57 @TestPropertySource(
58     properties =
59     [
60         "spring.jpa.show-sql=true",
61         "spring.jpa.properties.hibernate.show_sql=true",
62         "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
63
64         "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
65         "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
66         "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
67         "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
68
69         // To send initial test message
70         "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
71         "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
72         "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
73     ]
74 )
75 open class MessagePrioritizationConsumerTest {
76
77     private val log = logger(MessagePrioritizationConsumerTest::class)
78
79     @Autowired
80     lateinit var applicationContext: ApplicationContext
81
82     @Autowired
83     lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
84
85     @Autowired
86     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
87
88     @Autowired
89     lateinit var messagePrioritizationService: MessagePrioritizationService
90
91     @Autowired
92     lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
93
94     @Before
95     fun setup() {
96         BluePrintDependencyService.inject(applicationContext)
97     }
98
99     @Test
100     fun testBluePrintKafkaJDBCKeyStore() {
101         runBlocking {
102             assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
103
104             val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
105                 .instance(MessagePrioritizationStateService::class)
106             assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
107
108             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
109                 val message = messagePrioritizationService.saveMessage(it)
110                 val repoResult = messagePrioritizationService.getMessage(message.id)
111                 assertNotNull(repoResult, "failed to get inserted message.")
112             }
113         }
114     }
115
116     @Test
117     fun testMessagePrioritizationService() {
118         runBlocking {
119             assertTrue(
120                 ::messagePrioritizationService.isInitialized,
121                 "failed to initialize messagePrioritizationService"
122             )
123
124             log.info("****************  without Correlation **************")
125             /** Checking without correlation */
126             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
127                 messagePrioritizationService.prioritize(it)
128             }
129             log.info("****************  Same Group , with Correlation **************")
130             /** checking same group with correlation */
131             MessagePrioritizationSample
132                 .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
133                 .forEach {
134                     delay(10)
135                     messagePrioritizationService.prioritize(it)
136                 }
137             log.info("****************  Different Type , with Correlation **************")
138             /** checking different type, with correlation */
139             MessagePrioritizationSample
140                 .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
141                 .forEach {
142                     delay(10)
143                     messagePrioritizationService.prioritize(it)
144                 }
145         }
146     }
147
148     @Test
149     fun testStartConsuming() {
150         runBlocking {
151             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
152
153             val streamingConsumerService = bluePrintMessageLibPropertyService
154                 .blueprintMessageConsumerService(configuration.inputTopicSelector)
155             assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
156
157             val spyStreamingConsumerService = spyk(streamingConsumerService)
158             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
159             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
160             val messagePrioritizationConsumer = MessagePrioritizationConsumer(
161                 bluePrintMessageLibPropertyService
162             )
163             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
164
165             // Test Topology
166             val kafkaStreamConsumerFunction =
167                 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
168             val messageConsumerProperties = bluePrintMessageLibPropertyService
169                 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
170             val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
171             assertNotNull(topology, "failed to get create topology")
172
173             every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
174             spyMessagePrioritizationConsumer.startConsuming(configuration)
175             spyMessagePrioritizationConsumer.shutDown()
176         }
177     }
178
179     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
180     // @Test
181     fun testMessagePrioritizationConsumer() {
182         runBlocking {
183             messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
184
185             /** Send sample message with every 1 sec */
186             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
187                 .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
188             launch {
189                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
190                     delay(100)
191                     val headers: MutableMap<String, String> = hashMapOf()
192                     headers["id"] = it.id
193                     blueprintMessageProducerService.sendMessageNB(
194                         message = it.asJsonString(false),
195                         headers = headers
196                     )
197                 }
198
199                 MessagePrioritizationSample
200                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
201                     .forEach {
202                         delay(100)
203                         val headers: MutableMap<String, String> = hashMapOf()
204                         headers["id"] = it.id
205                         blueprintMessageProducerService.sendMessageNB(
206                             message = it.asJsonString(false),
207                             headers = headers
208                         )
209                     }
210
211                 MessagePrioritizationSample
212                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
213                     .forEach {
214                         delay(2000)
215                         val headers: MutableMap<String, String> = hashMapOf()
216                         headers["id"] = it.id
217                         blueprintMessageProducerService.sendMessageNB(
218                             message = it.asJsonString(false),
219                             headers = headers
220                         )
221                     }
222             }
223             delay(10000)
224             messagePrioritizationConsumer.shutDown()
225         }
226     }
227 }