ac35fbf2c339f858531d06ab448e8c4c8a140472
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / BluePrintMessageLibData.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message
19
20 import org.apache.kafka.clients.CommonClientConfigs
21 import org.apache.kafka.clients.consumer.ConsumerConfig
22 import org.apache.kafka.clients.producer.ProducerConfig
23 import org.apache.kafka.common.config.SaslConfigs
24 import org.apache.kafka.common.config.SslConfigs
25 import org.apache.kafka.common.security.auth.SecurityProtocol
26 import org.apache.kafka.common.security.scram.ScramLoginModule
27 import org.apache.kafka.common.serialization.ByteArrayDeserializer
28 import org.apache.kafka.common.serialization.ByteArraySerializer
29 import org.apache.kafka.common.serialization.StringDeserializer
30 import org.apache.kafka.common.serialization.StringSerializer
31 import org.apache.kafka.streams.StreamsConfig
32
33 /** Common Properties **/
34 abstract class CommonProperties {
35     lateinit var type: String
36     lateinit var topic: String
37     lateinit var bootstrapServers: String
38
39     open fun getConfig(): HashMap<String, Any> {
40         val configProps = hashMapOf<String, Any>()
41         configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
42         return configProps
43     }
44 }
45
46 /** Message Producer */
47 /** Message Producer Properties **/
48 abstract class MessageProducerProperties : CommonProperties()
49
50 /** Basic Auth */
51 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
52
53     var clientId: String? = null
54     // strongest producing guarantee
55     var acks: String = "all"
56     var retries: Int = 0
57     // ensure we don't push duplicates
58     var enableIdempotence: Boolean = true
59
60     override fun getConfig(): HashMap<String, Any> {
61         val configProps = super.getConfig()
62         configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
63         configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
64         configProps[ProducerConfig.ACKS_CONFIG] = acks
65         configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
66         if (clientId != null) {
67             configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
68         }
69         return configProps
70     }
71 }
72
73 /** SSL Auth */
74 open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
75     lateinit var truststore: String
76     lateinit var truststorePassword: String
77     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
78     var keystore: String? = null
79     var keystorePassword: String? = null
80     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
81     var sslEndpointIdentificationAlgorithm: String = ""
82
83     override fun getConfig(): HashMap<String, Any> {
84         val configProps = super.getConfig()
85         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
86         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
87         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
88         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
89         if (keystore != null) {
90             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
91             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
92             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
93         }
94         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
95
96         return configProps
97     }
98 }
99
100 /** (SASL) SCRAM SSL Auth */
101 class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
102     var saslMechanism: String = "SCRAM-SHA-512"
103     lateinit var scramUsername: String
104     lateinit var scramPassword: String
105
106     override fun getConfig(): HashMap<String, Any> {
107         val configProps = super.getConfig()
108         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
109         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
110         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
111                 "username=\"${scramUsername}\" " +
112                 "password=\"${scramPassword}\";"
113         return configProps
114     }
115 }
116
117 /** Consumer */
118 abstract class MessageConsumerProperties : CommonProperties()
119 /** Kafka Streams */
120 /** Streams properties */
121
122 /** Basic Auth */
123 open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
124     lateinit var applicationId: String
125     var autoOffsetReset: String = "latest"
126     var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
127
128     override fun getConfig(): HashMap<String, Any> {
129         val configProperties = super.getConfig()
130         configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
131         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
132         configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
133         return configProperties
134     }
135 }
136
137 /** SSL Auth */
138 open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
139     lateinit var truststore: String
140     lateinit var truststorePassword: String
141     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
142     var keystore: String? = null
143     var keystorePassword: String? = null
144     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
145     var sslEndpointIdentificationAlgorithm: String = ""
146
147     override fun getConfig(): HashMap<String, Any> {
148         val configProps = super.getConfig()
149         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
150         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
151         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
152         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
153         if (keystore != null) {
154             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
155             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
156             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
157         }
158         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
159         return configProps
160     }
161 }
162
163 /** (SASL) SCRAM SSL Auth */
164 class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
165     var saslMechanism: String = "SCRAM-SHA-512"
166     lateinit var scramUsername: String
167     lateinit var scramPassword: String
168
169     override fun getConfig(): HashMap<String, Any> {
170         val configProps = super.getConfig()
171         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
172         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
173         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
174                 "username=\"${scramUsername}\" " +
175                 "password=\"${scramPassword}\";"
176         return configProps
177     }
178 }
179
180 /** Message Consumer */
181 /** Message Consumer Properties **/
182 /** Basic Auth */
183 open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
184     lateinit var groupId: String
185     lateinit var clientId: String
186     var autoCommit: Boolean = true
187     var autoOffsetReset: String = "latest"
188     var pollMillSec: Long = 1000
189     var pollRecords: Int = -1
190
191     override fun getConfig(): HashMap<String, Any> {
192         val configProperties = super.getConfig()
193         configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
194         configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
195         /**
196          * earliest: automatically reset the offset to the earliest offset
197          * latest: automatically reset the offset to the latest offset
198          */
199         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
200         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
201         configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
202         configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
203
204         /** To handle Back pressure, Get only configured record for processing */
205         if (pollRecords > 0) {
206             configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
207         }
208
209         return configProperties
210     }
211 }
212
213 /** SSL Auth */
214 open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
215     lateinit var truststore: String
216     lateinit var truststorePassword: String
217     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
218     var keystore: String? = null
219     var keystorePassword: String? = null
220     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
221     var sslEndpointIdentificationAlgorithm: String = ""
222
223     override fun getConfig(): HashMap<String, Any> {
224         val configProps = super.getConfig()
225         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
226         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
227         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
228         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
229         if (keystore != null) {
230             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
231             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
232             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
233         }
234         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
235         return configProps
236     }
237 }
238
239 /** (SASL) SCRAM SSL Auth */
240 class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
241     var saslMechanism: String = "SCRAM-SHA-512"
242     lateinit var scramUsername: String
243     lateinit var scramPassword: String
244
245     override fun getConfig(): HashMap<String, Any> {
246         val configProps = super.getConfig()
247         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
248         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
249         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
250                 "username=\"${scramUsername}\" " +
251                 "password=\"${scramPassword}\";"
252         return configProps
253     }
254 }