Renaming Files having BluePrint to have Blueprint
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageProducerServiceTest.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
18
19 import io.mockk.every
20 import io.mockk.mockk
21 import io.mockk.spyk
22 import kotlinx.coroutines.runBlocking
23 import org.apache.kafka.clients.CommonClientConfigs
24 import org.apache.kafka.clients.consumer.ConsumerConfig
25 import org.apache.kafka.clients.producer.KafkaProducer
26 import org.apache.kafka.clients.producer.ProducerConfig
27 import org.apache.kafka.clients.producer.RecordMetadata
28 import org.apache.kafka.common.config.SaslConfigs
29 import org.apache.kafka.common.config.SslConfigs
30 import org.apache.kafka.common.security.auth.SecurityProtocol
31 import org.apache.kafka.common.security.scram.ScramLoginModule
32 import org.apache.kafka.common.serialization.ByteArraySerializer
33 import org.apache.kafka.common.serialization.StringSerializer
34 import org.junit.runner.RunWith
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService
36 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
37 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfiguration
38 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
39 import org.springframework.beans.factory.annotation.Autowired
40 import org.springframework.test.annotation.DirtiesContext
41 import org.springframework.test.context.ContextConfiguration
42 import org.springframework.test.context.TestPropertySource
43 import org.springframework.test.context.junit4.SpringRunner
44 import java.util.concurrent.Future
45 import kotlin.test.Test
46 import kotlin.test.assertEquals
47 import kotlin.test.assertTrue
48
49 @RunWith(SpringRunner::class)
50 @DirtiesContext
51 @ContextConfiguration(
52     classes = [
53         BlueprintMessageLibConfiguration::class,
54         BlueprintPropertyConfiguration::class, BlueprintPropertiesService::class
55     ]
56 )
57 @TestPropertySource(
58     properties =
59         [
60             "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
61             "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
62             "blueprintsprocessor.messageproducer.sample.topic=default-topic",
63             "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
64             "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
65             "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
66             "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
67             "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
68             "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
69             "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
70         ]
71 )
72 open class BlueprintMessageProducerServiceTest {
73
74     @Autowired
75     lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
76
77     @Test
78     fun testKafkaScramSslAuthProducerService() {
79         runBlocking {
80             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
81                 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
82
83             val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
84
85             val responseMock = mockk<Future<RecordMetadata>>()
86             every { responseMock.get() } returns mockk()
87
88             every { mockKafkaTemplate.send(any(), any()) } returns responseMock
89
90             val spyBlueprintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true)
91
92             every { spyBlueprintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
93
94             val response = spyBlueprintMessageProducerService.sendMessage("mykey", "Testing message")
95             assertTrue(response, "failed to get command response")
96         }
97     }
98
99     @Test
100     fun testKafkaScramSslAuthConfig() {
101         val expectedConfig = mapOf<String, Any>(
102             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
103             ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
104             ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
105             ProducerConfig.ACKS_CONFIG to "all",
106             ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
107             ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
108             ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
109             ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
110             CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
111             SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
112             SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
113             SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
114             SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
115             SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
116             SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
117             SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
118             SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
119             SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
120                 "username=\"sample-user\" " +
121                 "password=\"secretpassword\";"
122         )
123
124         val messageProducerProperties = bluePrintMessageLibPropertyService
125             .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
126
127         val configProps = messageProducerProperties.getConfig()
128
129         assertEquals(
130             messageProducerProperties.topic,
131             "default-topic",
132             "Topic doesn't match the expected value"
133         )
134         assertEquals(
135             messageProducerProperties.type,
136             "kafka-scram-ssl-auth",
137             "Authentication type doesn't match the expected value"
138         )
139
140         expectedConfig.forEach {
141             assertTrue(
142                 configProps.containsKey(it.key),
143                 "Missing expected kafka config key : ${it.key}"
144             )
145             assertEquals(
146                 configProps[it.key],
147                 it.value,
148                 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
149             )
150         }
151     }
152 }