import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.header.Header;
+import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
* @return boolean value.
*/
@Bean
- public RecordFilterStrategy<Object, Object> filterBatchDataResponseEvent() {
+ public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() {
return consumedRecord -> {
final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
- if (eventTypeHeader != null) {
- final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
- return !(eventTypeHeaderValue != null
- && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
- } else {
- return true;
+ if (eventTypeHeader == null) {
+ return false;
}
+ final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
+ return !(eventTypeHeaderValue != null
+ && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent"));
};
}
}
JsonObjectMapper jsonObjectMapper
@Autowired
- RecordFilterStrategy<Object, Object> recordFilterStrategy
+ RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy
def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
def static clientTopic = 'client-topic'