bb35b6614d1bf76fb72c8eefcdd28a0a294c7c26
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2021 Bell Canada.
4  *  Modification Copyright (C) 2022 Nordix Foundation.
5  *
6  *  Licensed under the Apache License, Version 2.0 (the "License");
7  *  you may not use this file except in compliance with the License.
8  *  You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  */
18
19 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20
21 import io.micrometer.core.instrument.MeterRegistry
22 import io.mockk.every
23 import io.mockk.mockk
24 import io.mockk.spyk
25 import kotlinx.coroutines.runBlocking
26 import org.apache.kafka.clients.CommonClientConfigs
27 import org.apache.kafka.clients.consumer.ConsumerConfig
28 import org.apache.kafka.clients.producer.KafkaProducer
29 import org.apache.kafka.clients.producer.ProducerConfig
30 import org.apache.kafka.clients.producer.RecordMetadata
31 import org.apache.kafka.common.config.SaslConfigs
32 import org.apache.kafka.common.config.SslConfigs
33 import org.apache.kafka.common.security.auth.SecurityProtocol
34 import org.apache.kafka.common.security.scram.ScramLoginModule
35 import org.apache.kafka.common.serialization.ByteArraySerializer
36 import org.apache.kafka.common.serialization.StringSerializer
37 import org.junit.runner.RunWith
38 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
39 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
40 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
41 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
42 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants.Companion.PROPERTY_MESSAGE_PRODUCER_PREFIX
43 import org.springframework.beans.factory.annotation.Autowired
44 import org.springframework.boot.test.mock.mockito.MockBean
45 import org.springframework.test.annotation.DirtiesContext
46 import org.springframework.test.context.ContextConfiguration
47 import org.springframework.test.context.TestPropertySource
48 import org.springframework.test.context.junit4.SpringRunner
49 import java.util.concurrent.Future
50 import kotlin.test.Test
51 import kotlin.test.assertEquals
52 import kotlin.test.assertTrue
53
54 @RunWith(SpringRunner::class)
55 @DirtiesContext
56 @ContextConfiguration(
57     classes = [
58         BluePrintMessageLibConfiguration::class,
59         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class
60     ]
61 )
62 @TestPropertySource(
63     properties =
64         [
65             "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
66             "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
67             "blueprintsprocessor.messageproducer.sample.topic=default-topic",
68             "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
69             "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
70             "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
71             "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
72             "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
73             "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
74             "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
75
76             "blueprintsprocessor.messageproducer.sample2.type=kafka-scram-plain-text-auth",
77             "blueprintsprocessor.messageproducer.sample2.bootstrapServers=127.0.0.1:9092",
78             "blueprintsprocessor.messageproducer.sample2.topic=default-topic",
79             "blueprintsprocessor.messageproducer.sample2.clientId=default-client-id",
80             "blueprintsprocessor.messageproducer.sample2.scramUsername=sample-user",
81             "blueprintsprocessor.messageproducer.sample2.scramPassword=secretpassword"
82         ]
83 )
84
85 open class BlueprintMessageProducerServiceTest {
86
87     @Autowired
88     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
89
90     @MockBean
91     lateinit var meterRegistry: MeterRegistry
92
93     @Test
94     fun testKafkaScramSslAuthProducerService() {
95         runBlocking {
96             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
97                 .blueprintMessageProducerService("sample") as KafkaMessageProducerService
98
99             val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
100
101             val responseMock = mockk<Future<RecordMetadata>>()
102             every { responseMock.get() } returns mockk()
103
104             every { mockKafkaTemplate.send(any(), any()) } returns responseMock
105
106             val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true)
107
108             every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
109
110             val response = spyBluePrintMessageProducerService.sendMessage("mykey", "Testing message")
111             assertTrue(response, "failed to get command response")
112         }
113     }
114
115     @Test
116     fun testKafkaScramSslAuthConfig() {
117         val expectedConfig = mapOf<String, Any>(
118             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
119             ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
120             ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
121             ProducerConfig.ACKS_CONFIG to "all",
122             ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
123             ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
124             ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
125             CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
126             SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
127             SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
128             SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
129             SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
130             SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
131             SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
132             SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
133             SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
134             SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
135                 "username=\"sample-user\" " +
136                 "password=\"secretpassword\";"
137         )
138
139         val messageProducerProperties = bluePrintMessageLibPropertyService
140             .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
141
142         val configProps = messageProducerProperties.getConfig()
143
144         assertEquals(
145             messageProducerProperties.topic,
146             "default-topic",
147             "Topic doesn't match the expected value"
148         )
149         assertEquals(
150             messageProducerProperties.type,
151             "kafka-scram-ssl-auth",
152             "Authentication type doesn't match the expected value"
153         )
154
155         assertTrue(
156             configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
157             "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
158         )
159         assertTrue(
160             configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
161             "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
162         )
163
164         expectedConfig.forEach {
165             assertTrue(
166                 configProps.containsKey(it.key),
167                 "Missing expected kafka config key : ${it.key}"
168             )
169             assertEquals(
170                 configProps[it.key],
171                 it.value,
172                 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
173             )
174         }
175     }
176
177     @Test
178     fun testKafkaScramPlaintextAuthConfig() {
179
180         val expectedConfig = mapOf<String, Any>(
181             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
182             ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
183             ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
184             ProducerConfig.ACKS_CONFIG to "all",
185             ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
186             ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
187             ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
188             CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_PLAINTEXT.toString()
189         )
190
191         val messageProducerProperties = bluePrintMessageLibPropertyService
192             .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample2")
193
194         val configProps = messageProducerProperties.getConfig()
195
196         assertEquals(
197             messageProducerProperties.topic,
198             "default-topic",
199             "Topic doesn't match the expected value"
200         )
201         assertEquals(
202             messageProducerProperties.type,
203             "kafka-scram-plain-text-auth",
204             "Authentication type doesn't match the expected value"
205         )
206
207         assertTrue(
208             configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
209             "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
210         )
211         assertTrue(
212             configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
213             "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
214         )
215
216         expectedConfig.forEach {
217             assertTrue(
218                 configProps.containsKey(it.key),
219                 "Missing expected kafka config key : ${it.key}"
220             )
221             assertEquals(
222                 configProps[it.key],
223                 it.value,
224                 "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
225             )
226         }
227     }
228 }