Merge "Client to NCMP Subscription schema change"
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / async / DataOperationRecordFilterStrategy.java
index 9e2b66a..76cc0c4 100644 (file)
@@ -20,9 +20,9 @@
 
 package org.onap.cps.ncmp.api.impl.async;
 
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.kafka.common.header.Header;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -32,6 +32,7 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
  *
  */
 @Configuration
+@Slf4j
 public class DataOperationRecordFilterStrategy {
 
     /**
@@ -41,15 +42,14 @@ public class DataOperationRecordFilterStrategy {
      * @return boolean value.
      */
     @Bean
-    public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() {
+    public RecordFilterStrategy<String, CloudEvent> includeDataOperationEventsOnly() {
         return consumedRecord -> {
-            final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
-            if (eventTypeHeader == null) {
-                return false;
+            final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(consumedRecord.headers(), "ce_type");
+            if (eventTypeHeaderValue == null) {
+                log.trace("No ce_type header found, possibly a legacy event (ignored)");
+                return true;
             }
-            final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
-            return !(eventTypeHeaderValue != null
-                    && eventTypeHeaderValue.contains("DataOperationEvent"));
+            return !(eventTypeHeaderValue.contains("DataOperationEvent"));
         };
     }
 }