Truncate message published on Kafka / Spike: Define solution for logs separation
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageProducerServiceTest.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
18
19 import io.mockk.every
20 import io.mockk.mockk
21 import io.mockk.spyk
22 import kotlinx.coroutines.runBlocking
23 import org.apache.kafka.clients.CommonClientConfigs
24 import org.apache.kafka.clients.consumer.ConsumerConfig
25 import org.apache.kafka.clients.producer.KafkaProducer
26 import org.apache.kafka.clients.producer.ProducerConfig
27 import org.apache.kafka.clients.producer.RecordMetadata
28 import org.apache.kafka.common.config.SaslConfigs
29 import org.apache.kafka.common.config.SslConfigs
30 import org.apache.kafka.common.security.auth.SecurityProtocol
31 import org.apache.kafka.common.security.scram.ScramLoginModule
32 import org.apache.kafka.common.serialization.ByteArraySerializer
33 import org.apache.kafka.common.serialization.StringSerializer
34 import org.junit.runner.RunWith
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
37 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
38 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
39 import org.springframework.beans.factory.annotation.Autowired
40 import org.springframework.test.annotation.DirtiesContext
41 import org.springframework.test.context.ContextConfiguration
42 import org.springframework.test.context.TestPropertySource
43 import org.springframework.test.context.junit4.SpringRunner
44 import java.util.concurrent.Future
45 import kotlin.test.Test
46 import kotlin.test.assertEquals
47 import kotlin.test.assertTrue
48
49 @RunWith(SpringRunner::class)
50 @DirtiesContext
51 @ContextConfiguration(
52     classes = [BluePrintMessageLibConfiguration::class,
53         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
54 )
55 @TestPropertySource(
56     properties =
57     ["blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
58         "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
59         "blueprintsprocessor.messageproducer.sample.topic=default-topic",
60         "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
61         "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
62         "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
63         "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
64         "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
65         "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
66         "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
67     ]
68 )
69 open class BlueprintMessageProducerServiceTest {
70
71     @Autowired
72     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
73
74     @Test
75     fun testKafkaScramSslAuthProducerService() {
76         runBlocking {
77             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
78                 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
79
80             val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
81
82             val responseMock = mockk<Future<RecordMetadata>>()
83             every { responseMock.get() } returns mockk()
84
85             every { mockKafkaTemplate.send(any(), any()) } returns responseMock
86
87             val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true)
88
89             every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
90
91             val response = spyBluePrintMessageProducerService.sendMessage("Testing message")
92             assertTrue(response, "failed to get command response")
93         }
94     }
95
96     @Test
97     fun testKafkaScramSslAuthConfig() {
98         val expectedConfig = mapOf<String, Any>(
99                 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
100                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
101                 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
102                 ProducerConfig.ACKS_CONFIG to "all",
103                 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
104                 ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
105                 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
106                 SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
107                 SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
108                 SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
109                 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
110                 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
111                 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
112                 SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
113                 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
114                 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
115                         "username=\"sample-user\" " +
116                         "password=\"secretpassword\";"
117         )
118
119         val messageProducerProperties = bluePrintMessageLibPropertyService
120                 .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
121
122         val configProps = messageProducerProperties.getConfig()
123
124         assertEquals(messageProducerProperties.topic,
125                 "default-topic",
126                 "Topic doesn't match the expected value"
127         )
128         assertEquals(messageProducerProperties.type,
129                 "kafka-scram-ssl-auth",
130                 "Authentication type doesn't match the expected value")
131
132         expectedConfig.forEach {
133             assertTrue(configProps.containsKey(it.key),
134                     "Missing expected kafka config key : ${it.key}"
135             )
136             assertEquals(configProps[it.key],
137                     it.value,
138                     "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
139             )
140         }
141     }
142 }