f9e23e826a51088bdc0fd0688ef008c58ffeb224
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
18
19 import io.mockk.coEvery
20 import io.mockk.every
21 import io.mockk.spyk
22 import kotlinx.coroutines.delay
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.junit.Before
26 import org.junit.runner.RunWith
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
29 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
30 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
31 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
33 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
34 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
35 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
36 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
37 import org.springframework.beans.factory.annotation.Autowired
38 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
39 import org.springframework.context.ApplicationContext
40 import org.springframework.test.annotation.DirtiesContext
41 import org.springframework.test.context.ContextConfiguration
42 import org.springframework.test.context.TestPropertySource
43 import org.springframework.test.context.junit4.SpringRunner
44 import kotlin.test.Test
45 import kotlin.test.assertNotNull
46
47 @RunWith(SpringRunner::class)
48 @DataJpaTest
49 @DirtiesContext
50 @ContextConfiguration(
51     classes = [BluePrintMessageLibConfiguration::class,
52         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
53         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
54 )
55 @TestPropertySource(
56     properties =
57     [
58         "spring.jpa.show-sql=true",
59         "spring.jpa.properties.hibernate.show_sql=true",
60         "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
61
62         "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
63         "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
64         "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
65         "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
66
67         // To send initial test message
68         "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
69         "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
70         "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
71     ]
72 )
73 open class MessagePrioritizationConsumerTest {
74
75     @Autowired
76     lateinit var applicationContext: ApplicationContext
77
78     @Autowired
79     lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
80
81     @Autowired
82     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
83
84     @Autowired
85     lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
86
87     @Before
88     fun setup() {
89         BluePrintDependencyService.inject(applicationContext)
90     }
91
92     @Test
93     fun testBluePrintKafkaJDBCKeyStore() {
94         runBlocking {
95             assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
96
97             val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
98                 .instance(MessagePrioritizationStateService::class)
99             assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
100
101             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
102                 val message = messagePrioritizationService.saveMessage(it)
103                 val repoResult = messagePrioritizationService.getMessage(message.id)
104                 assertNotNull(repoResult, "failed to get inserted message.")
105             }
106         }
107     }
108
109     @Test
110     fun testStartConsuming() {
111         runBlocking {
112             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
113
114             val streamingConsumerService = bluePrintMessageLibPropertyService
115                 .blueprintMessageConsumerService(configuration.inputTopicSelector)
116             assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
117
118             val spyStreamingConsumerService = spyk(streamingConsumerService)
119             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
120             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
121             val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
122             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
123
124             // Test Topology
125             val kafkaStreamConsumerFunction =
126                 spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
127             val messageConsumerProperties = bluePrintMessageLibPropertyService
128                 .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
129             val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
130             assertNotNull(topology, "failed to get create topology")
131
132             every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
133             spyMessagePrioritizationConsumer.startConsuming(configuration)
134             spyMessagePrioritizationConsumer.shutDown()
135         }
136     }
137
138     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
139     // @Test
140     fun testMessagePrioritizationConsumer() {
141         runBlocking {
142             messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
143
144             /** Send sample message with every 1 sec */
145             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
146                 .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
147             launch {
148                 MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
149                     delay(100)
150                     val headers: MutableMap<String, String> = hashMapOf()
151                     headers["id"] = it.id
152                     blueprintMessageProducerService.sendMessageNB(
153                         message = it.asJsonString(false),
154                         headers = headers
155                     )
156                 }
157
158                 MessagePrioritizationSample
159                     .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
160                     .forEach {
161                         delay(100)
162                         val headers: MutableMap<String, String> = hashMapOf()
163                         headers["id"] = it.id
164                         blueprintMessageProducerService.sendMessageNB(
165                             message = it.asJsonString(false),
166                             headers = headers
167                         )
168                     }
169
170                 MessagePrioritizationSample
171                     .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
172                     .forEach {
173                         delay(2000)
174                         val headers: MutableMap<String, String> = hashMapOf()
175                         headers["id"] = it.id
176                         blueprintMessageProducerService.sendMessageNB(
177                             message = it.asJsonString(false),
178                             headers = headers
179                         )
180                     }
181             }
182             delay(10000)
183             messagePrioritizationConsumer.shutDown()
184         }
185     }
186 }