Improve handling of legacy events 57/135057/1
authorToineSiebelink <toine.siebelink@est.tech>
Wed, 21 Jun 2023 15:45:47 +0000 (16:45 +0100)
committerToineSiebelink <toine.siebelink@est.tech>
Wed, 21 Jun 2023 16:01:18 +0000 (17:01 +0100)
Legacy (non-cloud) events would cause a NPE in the event filter
Although the event woudl still be ignored it would lead to uncessary error logging
This fix addressed this issue using a trace level message instead
(also some improvements in the associated test)

Issue-ID: CPS-1724
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Change-Id: Ibf71b13e4a47aaf705e32df5fe50cf41de5f558f

cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy

index ce666b1..76cc0c4 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.async;
 
 import io.cloudevents.CloudEvent;
 import io.cloudevents.kafka.impl.KafkaHeaders;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -31,6 +32,7 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
  *
  */
 @Configuration
+@Slf4j
 public class DataOperationRecordFilterStrategy {
 
     /**
@@ -42,8 +44,11 @@ public class DataOperationRecordFilterStrategy {
     @Bean
     public RecordFilterStrategy<String, CloudEvent> includeDataOperationEventsOnly() {
         return consumedRecord -> {
-            final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(
-                    consumedRecord.headers(), "ce_type");
+            final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(consumedRecord.headers(), "ce_type");
+            if (eventTypeHeaderValue == null) {
+                log.trace("No ce_type header found, possibly a legacy event (ignored)");
+                return true;
+            }
             return !(eventTypeHeaderValue.contains("DataOperationEvent"));
         };
     }
index 3db8520..c0bdf3d 100644 (file)
@@ -1,3 +1,23 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
 package org.onap.cps.ncmp.api.impl.async
 
 import io.cloudevents.CloudEvent
@@ -5,10 +25,12 @@ import io.cloudevents.core.builder.CloudEventBuilder
 import io.cloudevents.kafka.CloudEventSerializer
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.StringSerializer
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Value
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry
@@ -23,39 +45,55 @@ import java.util.concurrent.TimeUnit
 @EnableAutoConfiguration
 class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSpec {
 
+    @SpringBean
+    EventsPublisher mockEventsPublisher = Mock()
+
     @Autowired
     private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
 
-    @SpringBean
-    EventsPublisher mockEventsPublisher = Mock()
+    @Value('${app.ncmp.async-m2m.topic}')
+    def topic
 
-    def activateListeners() {
-        kafkaListenerEndpointRegistry.getListenerContainers().forEach(
-                messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
-        )
+    def setup() {
+        activateListeners()
     }
 
-    def 'Filtering Events.'() {
+    def 'Filtering Cloud Events on Type.'() {
         given: 'a cloud event of type: #eventType'
-            def cloudEvent = CloudEventBuilder.v1().withId("some-uuid")
+            def cloudEvent = CloudEventBuilder.v1().withId('some id')
                     .withType(eventType)
-                    .withSource(URI.create("sample-test-source"))
-                    .build();
-        and: 'activate message listener container'
-            activateListeners()
+                    .withSource(URI.create('some-source'))
+                    .build()
         when: 'send the cloud event'
-            ProducerRecord<String, CloudEvent> record = new ProducerRecord<>('ncmp-async-m2m', cloudEvent)
+            ProducerRecord<String, CloudEvent> record = new ProducerRecord<>(topic, cloudEvent)
             KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer))
-            producer.send(record);
+            producer.send(record)
         and: 'wait a little for async processing of message'
             TimeUnit.MILLISECONDS.sleep(100)
         then: 'the event has only been forwarded for the correct type'
-            expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(_, _, _)
-        where:
+            expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(*_)
+        where: 'the following event types are used'
             eventType                                        || expectedNUmberOfCallsToPublishForwardedEvent
             'DataOperationEvent'                             || 1
             'other type'                                     || 0
             'any type contain the word "DataOperationEvent"' || 1
     }
-}
 
+    def 'Non cloud events on same Topic.'() {
+        when: 'sending a non-cloud event on the same topic'
+            ProducerRecord<String, String> record = new ProducerRecord<>(topic, 'simple string event')
+            KafkaProducer<String, String> producer = new KafkaProducer<>(eventProducerConfigProperties(StringSerializer))
+            producer.send(record)
+        and: 'wait a little for async processing of message'
+            TimeUnit.MILLISECONDS.sleep(100)
+        then: 'the event is not processed by this consumer'
+            0 * mockEventsPublisher.publishCloudEvent(*_)
+    }
+
+    def activateListeners() {
+        kafkaListenerEndpointRegistry.getListenerContainers().forEach(
+            messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
+        )
+    }
+
+}