package org.onap.cps.ncmp.api.impl.async;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kafka.common.header.Header;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@Bean
public RecordFilterStrategy<Object, Object> filterBatchDataResponseEvent() {
return consumedRecord -> {
- final String headerValue = SerializationUtils
- .deserialize(consumedRecord.headers().lastHeader("eventType").value());
- return !(headerValue != null
- && headerValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
+ final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
+ if (eventTypeHeader != null) {
+ final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
+ return !(eventTypeHeaderValue != null
+ && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
+ } else {
+ return true;
+ }
};
}
}