NCMP : forward bulk response messages to client topic 52/134752/3
authorraviteja.karumuri <raviteja.karumuri@est.tech>
Fri, 2 Jun 2023 09:45:13 +0000 (10:45 +0100)
committerraviteja.karumuri <raviteja.karumuri@est.tech>
Tue, 6 Jun 2023 13:01:08 +0000 (14:01 +0100)
# Fixing the avc subscription event is not consuming even there is a record published on to the topic.

Issue-ID: CPS-1557
Signed-off-by: raviteja.karumuri <raviteja.karumuri@est.tech>
Change-Id: If09fd1849f467785141cc56639839ddda9f2c0de

cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy

index 2c76599..b343d70 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.async;
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.kafka.common.header.Header;
+import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -40,16 +41,15 @@ public class BatchRecordFilterStrategy {
      * @return boolean value.
      */
     @Bean
-    public RecordFilterStrategy<Object, Object> filterBatchDataResponseEvent() {
+    public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() {
         return consumedRecord -> {
             final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
-            if (eventTypeHeader != null) {
-                final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
-                return !(eventTypeHeaderValue != null
-                        && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
-            } else {
-                return true;
+            if (eventTypeHeader == null) {
+                return false;
             }
+            final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
+            return !(eventTypeHeaderValue != null
+                    && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
         };
     }
 }
index 65c43a0..28464bb 100644 (file)
@@ -55,7 +55,7 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
     JsonObjectMapper jsonObjectMapper
 
     @Autowired
-    RecordFilterStrategy<Object, Object> recordFilterStrategy
+    RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy
 
     def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
     def static clientTopic = 'client-topic'