Merge "Patch # 3: Data operation response event (NCMP → Client App) to comply with...
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / async / DataOperationRecordFilterStrategy.java
index 9e2b66a..ce666b1 100644 (file)
@@ -20,9 +20,8 @@
 
 package org.onap.cps.ncmp.api.impl.async;
 
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.kafka.common.header.Header;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -41,15 +40,11 @@ public class DataOperationRecordFilterStrategy {
      * @return boolean value.
      */
     @Bean
-    public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() {
+    public RecordFilterStrategy<String, CloudEvent> includeDataOperationEventsOnly() {
         return consumedRecord -> {
-            final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
-            if (eventTypeHeader == null) {
-                return false;
-            }
-            final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
-            return !(eventTypeHeaderValue != null
-                    && eventTypeHeaderValue.contains("DataOperationEvent"));
+            final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(
+                    consumedRecord.headers(), "ce_type");
+            return !(eventTypeHeaderValue.contains("DataOperationEvent"));
         };
     }
 }