Renaming Files having BluePrint to have Blueprint
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / BlueprintMessageLibData.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message
19
20 import org.apache.kafka.clients.CommonClientConfigs
21 import org.apache.kafka.clients.consumer.ConsumerConfig
22 import org.apache.kafka.clients.producer.ProducerConfig
23 import org.apache.kafka.common.config.SaslConfigs
24 import org.apache.kafka.common.config.SslConfigs
25 import org.apache.kafka.common.security.auth.SecurityProtocol
26 import org.apache.kafka.common.security.scram.ScramLoginModule
27 import org.apache.kafka.common.serialization.ByteArrayDeserializer
28 import org.apache.kafka.common.serialization.ByteArraySerializer
29 import org.apache.kafka.common.serialization.StringDeserializer
30 import org.apache.kafka.common.serialization.StringSerializer
31 import org.apache.kafka.streams.StreamsConfig
32
33 /** Common Properties **/
34 abstract class CommonProperties {
35
36     lateinit var type: String
37     lateinit var topic: String
38     lateinit var bootstrapServers: String
39
40     open fun getConfig(): HashMap<String, Any> {
41         val configProps = hashMapOf<String, Any>()
42         configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
43         return configProps
44     }
45 }
46
47 /** Message Producer */
48 /** Message Producer Properties **/
49 abstract class MessageProducerProperties : CommonProperties()
50
51 /** Basic Auth */
52 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
53
54     var clientId: String? = null
55     var acks: String = "all" // strongest producing guarantee
56     var maxBlockMs: Int = 250 // max blocking time in ms to send a message
57     var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
58     var enableIdempotence: Boolean = true // ensure we don't push duplicates
59
60     override fun getConfig(): HashMap<String, Any> {
61         val configProps = super.getConfig()
62         configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
63         configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
64         configProps[ProducerConfig.ACKS_CONFIG] = acks
65         configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
66         configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
67         configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
68         if (clientId != null) {
69             configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
70         }
71         return configProps
72     }
73 }
74
75 /** SSL Auth */
76 open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
77
78     lateinit var truststore: String
79     lateinit var truststorePassword: String
80     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
81     var keystore: String? = null
82     var keystorePassword: String? = null
83     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
84     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
85
86     override fun getConfig(): HashMap<String, Any> {
87         val configProps = super.getConfig()
88         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
89         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
90         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
91         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
92         if (keystore != null) {
93             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
94             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
95             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
96         }
97         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
98
99         return configProps
100     }
101 }
102
103 /** (SASL) SCRAM SSL Auth */
104 class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
105
106     var saslMechanism: String = "SCRAM-SHA-512"
107     lateinit var scramUsername: String
108     lateinit var scramPassword: String
109
110     override fun getConfig(): HashMap<String, Any> {
111         val configProps = super.getConfig()
112         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
113         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
114         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
115             "username=\"${scramUsername}\" " +
116             "password=\"${scramPassword}\";"
117         return configProps
118     }
119 }
120
121 /** Consumer */
122 abstract class MessageConsumerProperties : CommonProperties()
123 /** Kafka Streams */
124 /** Streams properties */
125
126 /** Basic Auth */
127 open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
128
129     lateinit var applicationId: String
130     var autoOffsetReset: String = "latest"
131     var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
132
133     override fun getConfig(): HashMap<String, Any> {
134         val configProperties = super.getConfig()
135         configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
136         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
137         configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
138         return configProperties
139     }
140 }
141
142 /** SSL Auth */
143 open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
144
145     lateinit var truststore: String
146     lateinit var truststorePassword: String
147     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
148     var keystore: String? = null
149     var keystorePassword: String? = null
150     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
151     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
152
153     override fun getConfig(): HashMap<String, Any> {
154         val configProps = super.getConfig()
155         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
156         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
157         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
158         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
159         if (keystore != null) {
160             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
161             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
162             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
163         }
164         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
165         return configProps
166     }
167 }
168
169 /** (SASL) SCRAM SSL Auth */
170 class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
171
172     var saslMechanism: String = "SCRAM-SHA-512"
173     lateinit var scramUsername: String
174     lateinit var scramPassword: String
175
176     override fun getConfig(): HashMap<String, Any> {
177         val configProps = super.getConfig()
178         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
179         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
180         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
181             "username=\"${scramUsername}\" " +
182             "password=\"${scramPassword}\";"
183         return configProps
184     }
185 }
186
187 /** Message Consumer */
188 /** Message Consumer Properties **/
189 /** Basic Auth */
190 open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
191
192     lateinit var groupId: String
193     lateinit var clientId: String
194     var autoCommit: Boolean = true
195     var autoOffsetReset: String = "latest"
196     var pollMillSec: Long = 1000
197     var pollRecords: Int = -1
198
199     override fun getConfig(): HashMap<String, Any> {
200         val configProperties = super.getConfig()
201         configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
202         configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
203         /**
204          * earliest: automatically reset the offset to the earliest offset
205          * latest: automatically reset the offset to the latest offset
206          */
207         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
208         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
209         configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
210         configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
211
212         /** To handle Back pressure, Get only configured record for processing */
213         if (pollRecords > 0) {
214             configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
215         }
216
217         return configProperties
218     }
219 }
220
221 /** SSL Auth */
222 open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
223
224     lateinit var truststore: String
225     lateinit var truststorePassword: String
226     var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
227     var keystore: String? = null
228     var keystorePassword: String? = null
229     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
230     var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
231
232     override fun getConfig(): HashMap<String, Any> {
233         val configProps = super.getConfig()
234         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
235         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
236         configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
237         configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
238         if (keystore != null) {
239             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
240             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
241             configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
242         }
243         configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
244         return configProps
245     }
246 }
247
248 /** (SASL) SCRAM SSL Auth */
249 class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
250
251     var saslMechanism: String = "SCRAM-SHA-512"
252     lateinit var scramUsername: String
253     lateinit var scramPassword: String
254
255     override fun getConfig(): HashMap<String, Any> {
256         val configProps = super.getConfig()
257         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
258         configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
259         configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
260             "username=\"${scramUsername}\" " +
261             "password=\"${scramPassword}\";"
262         return configProps
263     }
264 }