Assign a unique worker ID for CDS Kafka worker 31/118831/7
authorJulien Fontaine <julien.fontaine@bell.ca>
Mon, 15 Feb 2021 23:31:24 +0000 (18:31 -0500)
committerKAPIL SINGAL <ks220y@att.com>
Tue, 16 Mar 2021 19:17:05 +0000 (19:17 +0000)
* Modified CDS Kafka workers to add the 5 lasts characters of the env var HOSTNAME to their worker ID.
* Small refactoring to move some utilitary functions to BlueprintMessageUtils

Issue-ID: CCSDK-3204
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: Iaacd35e9cbe4705d17548518040c679185eaf30a

ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt

index 67dba1f..3e7db95 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.streams.StreamsConfig
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
 
 /** Common Properties **/
 abstract class CommonProperties {
@@ -51,7 +52,7 @@ abstract class MessageProducerProperties : CommonProperties()
 /** Basic Auth */
 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
 
-    var clientId: String? = null
+    lateinit var clientId: String
     var acks: String = "all" // strongest producing guarantee
     var maxBlockMs: Int = 250 // max blocking time in ms to send a message
     var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
@@ -65,9 +66,7 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties()
         configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
         configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
         configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
-        if (clientId != null) {
-            configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
-        }
+        configProps[ProducerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
         return configProps
     }
 }
@@ -87,8 +86,8 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer
         val configProps = super.getConfig()
         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
-        configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
-        configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+        configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
+        configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
         if (keystore != null) {
             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
@@ -132,7 +131,9 @@ open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties()
 
     override fun getConfig(): HashMap<String, Any> {
         val configProperties = super.getConfig()
-        configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
+        // adjust the worker name with the hostname suffix because we'll have several workers, running in
+        // different pods, using the same worker name otherwise.
+        configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = "$applicationId-${BlueprintMessageUtils.getHostnameSuffix()}"
         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
         configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
         return configProperties
@@ -154,8 +155,8 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer
         val configProps = super.getConfig()
         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
-        configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
-        configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+        configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
+        configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
         if (keystore != null) {
             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
@@ -207,7 +208,9 @@ open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties()
         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
         configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
-        configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
+        // adjust the worker name with the hostname suffix because we'll have several workers, running in
+        // different pods, using the same worker name otherwise.
+        configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = "$clientId-${BlueprintMessageUtils.getHostnameSuffix()}"
 
         /** To handle Back pressure, Get only configured record for processing */
         if (pollRecords > 0) {
@@ -233,8 +236,8 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer
         val configProps = super.getConfig()
         configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
         configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
-        configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
-        configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+        configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore
+        configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword
         if (keystore != null) {
             configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
             configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
index 21fd84d..d40067f 100644 (file)
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.Callback
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.header.internals.RecordHeader
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
@@ -89,12 +88,13 @@ class KafkaMessageProducerService(
                     BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER,
                     BlueprintMessageUtils.kafkaMetricTag(topic)
                 ).increment()
-                log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
+                log.error("Couldn't publish ${clonedMessage::class.simpleName} ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}.", exception)
             } else {
-                val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
-                    "partition(${metadata.partition()}) " +
-                    "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
-                log.info(message)
+                log.info(
+                    "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
+                        "partition(${metadata.partition()}) " +
+                        "offset(${metadata.offset()}) ${BlueprintMessageUtils.getMessageLogData(clonedMessage)}."
+                )
             }
         }
         messageTemplate().send(record, callback)
@@ -104,7 +104,6 @@ class KafkaMessageProducerService(
     fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
         log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
         val configProps = messageProducerProperties.getConfig()
-
         /** Add additional Properties */
         if (additionalConfig != null)
             configProps.putAll(additionalConfig)
@@ -147,18 +146,4 @@ class KafkaMessageProducerService(
             stepData = executionServiceOutput.stepData
         }
     }
-
-    private fun getMessageLogData(message: Any): String {
-        return when (message) {
-            is ExecutionServiceInput -> {
-                val actionIdentifiers = message.actionIdentifiers
-                "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
-            }
-            is ExecutionServiceOutput -> {
-                val actionIdentifiers = message.actionIdentifiers
-                "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
-            }
-            else -> "message($message)"
-        }
-    }
 }
index 7431998..b4817cd 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
 
 import io.micrometer.core.instrument.Tag
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import kotlin.math.max
 
 class BlueprintMessageUtils {
     companion object {
@@ -25,5 +29,27 @@ class BlueprintMessageUtils {
             mutableListOf(
                 Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic)
             )
+
+        /**
+         * get OS hostname's last 5 characters
+         * Used to generate unique client ID.
+         */
+        fun getHostnameSuffix(): String =
+            System.getenv("HOSTNAME").defaultToUUID().let {
+                it.substring(max(0, it.length - 5))
+            }
+
+        fun getMessageLogData(message: Any): String =
+            when (message) {
+                is ExecutionServiceInput -> {
+                    val actionIdentifiers = message.actionIdentifiers
+                    "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+                }
+                is ExecutionServiceOutput -> {
+                    val actionIdentifiers = message.actionIdentifiers
+                    "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+                }
+                else -> "message($message)"
+            }
     }
 }
index fb53ff4..15b7353 100644 (file)
@@ -212,7 +212,6 @@ open class BlueprintMessageConsumerServiceTest {
 
     @Test
     fun testKafkaScramSslAuthConfig() {
-
         val expectedConfig = mapOf<String, Any>(
             ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
             ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
@@ -220,7 +219,6 @@ open class BlueprintMessageConsumerServiceTest {
             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
             ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
             ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
-            ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
             CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
             SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
             SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
@@ -251,6 +249,15 @@ open class BlueprintMessageConsumerServiceTest {
             "Authentication type doesn't match the expected value"
         )
 
+        assertTrue(
+            configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
+            "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
+        )
+        assertTrue(
+            configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
+            "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
+        )
+
         expectedConfig.forEach {
             assertTrue(
                 configProps.containsKey(it.key),
index 1490a33..30021a6 100644 (file)
@@ -112,7 +112,6 @@ open class BlueprintMessageProducerServiceTest {
             ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
             ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
             ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
-            ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
             CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
             SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
             SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
@@ -143,6 +142,15 @@ open class BlueprintMessageProducerServiceTest {
             "Authentication type doesn't match the expected value"
         )
 
+        assertTrue(
+            configProps.containsKey(ConsumerConfig.CLIENT_ID_CONFIG),
+            "Missing expected kafka config key : ${ConsumerConfig.CLIENT_ID_CONFIG}"
+        )
+        assertTrue(
+            configProps[ConsumerConfig.CLIENT_ID_CONFIG].toString().startsWith("default-client-id"),
+            "Invalid prefix for ${ConsumerConfig.CLIENT_ID_CONFIG} : ${configProps[ConsumerConfig.CLIENT_ID_CONFIG]} is supposed to start with default-client-id"
+        )
+
         expectedConfig.forEach {
             assertTrue(
                 configProps.containsKey(it.key),
index 849a411..de9ca2c 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
 
 import io.micrometer.core.instrument.Tag
+import io.mockk.every
+import io.mockk.mockkStatic
 import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+
 import kotlin.test.assertEquals
 
 class BlueprintMessageUtilsTest {
@@ -32,4 +37,46 @@ class BlueprintMessageUtilsTest {
 
         assertEquals(expected, tags)
     }
+
+    @Test
+    fun testGetHostnameSuffix() {
+        mockkStatic(System::class)
+        every { System.getenv("HOSTNAME") } returns "qwertyuiop"
+        assertEquals("yuiop", BlueprintMessageUtils.getHostnameSuffix())
+    }
+
+    @Test
+    fun testGetNullHostnameSuffix() {
+        mockkStatic(System::class)
+        every { System.getenv("HOSTNAME") } returns null
+        assertEquals(5, BlueprintMessageUtils.getHostnameSuffix().length)
+    }
+
+    @Test
+    fun testGetMessageLogData() {
+        val input = ExecutionServiceInput().apply {
+            actionIdentifiers = ActionIdentifiers().apply {
+                blueprintName = "bpInput"
+                blueprintVersion = "1.0.0-input"
+                actionName = "bpActionInput"
+            }
+        }
+        val expectedOnInput = "CBA(bpInput/1.0.0-input/bpActionInput)"
+
+        val output = ExecutionServiceInput().apply {
+            actionIdentifiers = ActionIdentifiers().apply {
+                blueprintName = "bpOutput"
+                blueprintVersion = "1.0.0-output"
+                actionName = "bpActionOutput"
+            }
+        }
+        val expectedOnOutput = "CBA(bpOutput/1.0.0-output/bpActionOutput)"
+
+        val otherMessage = "some other message"
+        val expectedOnOtherMessage = "message(some other message)"
+
+        assertEquals(expectedOnInput, BlueprintMessageUtils.getMessageLogData(input))
+        assertEquals(expectedOnOutput, BlueprintMessageUtils.getMessageLogData(output))
+        assertEquals(expectedOnOtherMessage, BlueprintMessageUtils.getMessageLogData(otherMessage))
+    }
 }
index 661e76b..82fd78e 100644 (file)
@@ -116,7 +116,7 @@ open class BlueprintProcessingKafkaConsumer(
                                     "leaderEpoch(${message.leaderEpoch().get()}) " +
                                     "offset(${message.offset()}) " +
                                     "key(${message.key()}) " +
-                                    "CBA(${executionServiceInput.actionIdentifiers.blueprintName}/${executionServiceInput.actionIdentifiers.blueprintVersion}/${executionServiceInput.actionIdentifiers.actionName})"
+                                    BlueprintMessageUtils.getMessageLogData(executionServiceInput)
                             )
                             val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
                             blueprintMessageProducerService.sendMessage(key, executionServiceOutput)