Merge "rb-version changed to vf-module-model-customization-uuid"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / BluePrintMessageLibData.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message
19
20 import org.apache.kafka.clients.CommonClientConfigs
21 import org.apache.kafka.clients.consumer.ConsumerConfig
22 import org.apache.kafka.clients.producer.ProducerConfig
23 import org.apache.kafka.common.config.SaslConfigs
24 import org.apache.kafka.common.config.SslConfigs
25 import org.apache.kafka.common.security.auth.SecurityProtocol
26 import org.apache.kafka.common.security.scram.ScramLoginModule
27 import org.apache.kafka.common.serialization.ByteArrayDeserializer
28 import org.apache.kafka.common.serialization.ByteArraySerializer
29 import org.apache.kafka.common.serialization.StringDeserializer
30 import org.apache.kafka.common.serialization.StringSerializer
31 import org.apache.kafka.streams.StreamsConfig
32 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
33
34 /** Common Properties **/
35 abstract class CommonProperties {
36
37     lateinit var type: String
38     lateinit var topic: String
39     lateinit var bootstrapServers: String
40
41     open fun getConfig(): HashMap<String, Any> {
42         val configProps = hashMapOf<String, Any>()
43         configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
44         return configProps
45     }
46 }
47
48 /** Message Producer */
49 /** Message Producer Properties **/
50 abstract class MessageProducerProperties : CommonProperties()
51
52 /** Basic Auth */
53 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
54
55     lateinit var clientId: String
56     var acks: String = "all" // strongest producing guarantee
57     var maxBlockMs: Int = 250 // max blocking time in ms to send a message
58     var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
59     var enableIdempotence: Boolean = true // ensure we don't push duplicates
60
61     override fun getConfig(): HashMap<String, Any> {
62         val configProps = super.getConfig()
63         configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
64         configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
65         configProps[ProducerConfig.ACKS_CONFIG] = acks
66         configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
67         configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
68         configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
69         configProps[ProducerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
70         return configProps
71     }
72 }
73
74 /** SSL Auth */
75 open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
76
77     lateinit var truststore: String
78     lateinit var truststorePassword: String
79     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
80     var keystore: String? = null
81     var keystorePassword: String? = null
82     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
83     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
84
85     override fun getConfig(): HashMap<String, Any> {
86         val configProps = super.getConfig()
87         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
88         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
89         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
90         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
91         if (keystore != null) {
92             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
93             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
94             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
95         }
96         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
97
98         return configProps
99     }
100 }
101
102 /** (SASL) SCRAM SSL Auth */
103 class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
104
105     var saslMechanism: String = "SCRAM-SHA-512"
106     lateinit var scramUsername: String
107     lateinit var scramPassword: String
108
109     override fun getConfig(): HashMap<String, Any> {
110         val configProps = super.getConfig()
111         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
112         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
113         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
114             "username=\"${scramUsername}\" " +
115             "password=\"${scramPassword}\";"
116         return configProps
117     }
118 }
119
120 /** Consumer */
121 abstract class MessageConsumerProperties : CommonProperties()
122 /** Kafka Streams */
123 /** Streams properties */
124
125 /** Basic Auth */
126 open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
127
128     lateinit var applicationId: String
129     var autoOffsetReset: String = "latest"
130     var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
131
132     override fun getConfig(): HashMap<String, Any> {
133         val configProperties = super.getConfig()
134         // adjust the worker name with the hostname suffix because we'll have several workers, running in
135         // different pods, using the same worker name otherwise.
136         configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = "$applicationId-${BlueprintMessageUtils.getHostnameSuffix()}"
137         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
138         configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
139         return configProperties
140     }
141 }
142
143 /** SSL Auth */
144 open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
145
146     lateinit var truststore: String
147     lateinit var truststorePassword: String
148     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
149     var keystore: String? = null
150     var keystorePassword: String? = null
151     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
152     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
153
154     override fun getConfig(): HashMap<String, Any> {
155         val configProps = super.getConfig()
156         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
157         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
158         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
159         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
160         if (keystore != null) {
161             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
162             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
163             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
164         }
165         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
166         return configProps
167     }
168 }
169
170 /** (SASL) SCRAM SSL Auth */
171 class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
172
173     var saslMechanism: String = "SCRAM-SHA-512"
174     lateinit var scramUsername: String
175     lateinit var scramPassword: String
176
177     override fun getConfig(): HashMap<String, Any> {
178         val configProps = super.getConfig()
179         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
180         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
181         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
182             "username=\"${scramUsername}\" " +
183             "password=\"${scramPassword}\";"
184         return configProps
185     }
186 }
187
188 /** Message Consumer */
189 /** Message Consumer Properties **/
190 /** Basic Auth */
191 open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
192
193     lateinit var groupId: String
194     lateinit var clientId: String
195     var autoCommit: Boolean = true
196     var autoOffsetReset: String = "latest"
197     var pollMillSec: Long = 1000
198     var pollRecords: Int = -1
199
200     override fun getConfig(): HashMap<String, Any> {
201         val configProperties = super.getConfig()
202         configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
203         configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
204         /**
205          * earliest: automatically reset the offset to the earliest offset
206          * latest: automatically reset the offset to the latest offset
207          */
208         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
209         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
210         configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
211         // adjust the worker name with the hostname suffix because we'll have several workers, running in
212         // different pods, using the same worker name otherwise.
213         configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
214
215         /** To handle Back pressure, Get only configured record for processing */
216         if (pollRecords > 0) {
217             configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
218         }
219
220         return configProperties
221     }
222 }
223
224 /** SSL Auth */
225 open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
226
227     lateinit var truststore: String
228     lateinit var truststorePassword: String
229     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
230     var keystore: String? = null
231     var keystorePassword: String? = null
232     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
233     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
234
235     override fun getConfig(): HashMap<String, Any> {
236         val configProps = super.getConfig()
237         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
238         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
239         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
240         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
241         if (keystore != null) {
242             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
243             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
244             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
245         }
246         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
247         return configProps
248     }
249 }
250
251 /** (SASL) SCRAM SSL Auth */
252 class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
253
254     var saslMechanism: String = "SCRAM-SHA-512"
255     lateinit var scramUsername: String
256     lateinit var scramPassword: String
257
258     override fun getConfig(): HashMap<String, Any> {
259         val configProps = super.getConfig()
260         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
261         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
262         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
263             "username=\"${scramUsername}\" " +
264             "password=\"${scramPassword}\";"
265         return configProps
266     }
267 }