Fixed Kafka consumer behaviour on failed deserialization 68/91868/1
authorprathameshmo <prathamesh.morde@bell.ca>
Mon, 22 Jul 2019 17:48:51 +0000 (13:48 -0400)
committerprathameshmo <prathamesh_morde@yahoo.ca>
Tue, 23 Jul 2019 13:56:27 +0000 (09:56 -0400)
-Added ErrorHandlingDeserializer
-Updated the integration test.

Issue-ID: CCSDK-1514
Change-Id: I69112df850dfae2d4a3bd967b1dcfa541ea1523a
Signed-off-by: prathameshmo <prathamesh_morde@yahoo.ca>
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip [deleted file]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip [new file with mode: 0755]

index a04a799..17e157d 100644 (file)
@@ -1,5 +1,7 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 
+import com.fasterxml.jackson.databind.DeserializationFeature
+import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.serialization.StringDeserializer
@@ -7,10 +9,10 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu
 import org.springframework.beans.factory.annotation.Value
 import org.springframework.context.annotation.Bean
 import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.annotation.EnableKafka
 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
 import org.springframework.kafka.core.ConsumerFactory
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
 import org.springframework.kafka.support.serializer.JsonDeserializer
 
 @Configuration
@@ -26,11 +28,20 @@ open class MessagingConfig {
         val configProperties = hashMapOf<String, Any>()
         configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
         configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
-        configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
-        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
-        configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
+        configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java
+        configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name
 
-        return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java))
+        val deserializer = JsonDeserializer<ExecutionServiceInput>()
+        deserializer.setRemoveTypeHeaders(true)
+        deserializer.addTrustedPackages("*")
+
+        val jsonDeserializer =  JsonDeserializer(ExecutionServiceInput::class.java,
+                ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))
+
+        return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
+                ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer))
     }
 
     /**
index 1d219a8..54cc0c1 100644 (file)
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 import kotlinx.coroutines.async
 import kotlinx.coroutines.runBlocking
 import org.apache.commons.lang3.builder.ToStringBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.slf4j.LoggerFactory
@@ -39,17 +40,15 @@ open class MessagingController(private val propertyService: BluePrintMessageLibP
     }
 
     @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
-    open fun receive(input: ExecutionServiceInput) {
-
-        log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+    open fun receive(record: ConsumerRecord<String, ExecutionServiceInput>) {
 
         runBlocking {
-            log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+            log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value()))
 
             // Process the message.
             async {
-                processMessage(input)
-            }
+                processMessage(record.value())
+            }.await()
         }
     }
 
index f7459f5..602033a 100644 (file)
@@ -90,7 +90,7 @@ class MessagingControllerTest {
     @Autowired
     lateinit var webTestClient: WebTestClient
 
-    var receivedEvent: String? = null
+    var event: ExecutionServiceInput? = null
 
     @Before
     fun setup() {
@@ -142,11 +142,13 @@ class MessagingControllerTest {
         log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input))
 
         Thread.sleep(1000)
+
+        assertNotNull(event)
     }
 
     @KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])])
-    fun receivedEventFromBluePrintProducer(event: ExecutionServiceInput) {
-        assertNotNull(event)
+    fun receivedEventFromBluePrintProducer(receivedEvent: ExecutionServiceInput) {
+        event = receivedEvent
     }
 
     private fun uploadBluePrint() {
@@ -172,7 +174,7 @@ class MessagingControllerTest {
     }
 
     private fun loadCbaArchive():File {
-        return Paths.get("./src/test/resources/cba-for-kafka-integration.zip").toFile()
+        return Paths.get("./src/test/resources/cba-for-kafka-integration_enriched.zip").toFile()
     }
 
     @Configuration
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip
deleted file mode 100644 (file)
index 2307038..0000000
Binary files a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip and /dev/null differ
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip
new file mode 100755 (executable)
index 0000000..9581191
Binary files /dev/null and b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip differ