Merge "CmHandle delete is failing with InternalServerError: Null key is not allowed...
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / async / FilterStrategiesIntegrationSpec.groovy
index 43d0648..fba1f95 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (c) 2023 Nordix Foundation.
+ * Copyright (c) 2023-2024 Nordix Foundation.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 package org.onap.cps.ncmp.api.impl.async
 
 import io.cloudevents.core.builder.CloudEventBuilder
+import org.onap.cps.events.EventsPublisher
 import org.onap.cps.ncmp.api.impl.config.kafka.KafkaConfig
-import org.onap.cps.ncmp.api.impl.events.EventsPublisher
-import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.api.kafka.ConsumerBaseSpec
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
 import org.spockframework.spring.SpringBean
-import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.beans.factory.annotation.Value
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry
-import org.springframework.kafka.test.utils.ContainerTestUtils
 import org.springframework.test.annotation.DirtiesContext
 import org.testcontainers.spock.Testcontainers
+import spock.util.concurrent.PollingConditions
 import java.util.concurrent.TimeUnit
 
 @SpringBootTest(classes =[DataOperationEventConsumer, AsyncRestRequestResponseEventConsumer, RecordFilterStrategies, KafkaConfig])
 @DirtiesContext
 @Testcontainers
 @EnableAutoConfiguration
-class FilterStrategiesIntegrationSpec extends MessagingBaseSpec {
+class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
 
     @SpringBean
     EventsPublisher mockEventsPublisher = Mock()
@@ -48,39 +46,38 @@ class FilterStrategiesIntegrationSpec extends MessagingBaseSpec {
     @SpringBean
     NcmpAsyncRequestResponseEventMapper mapper = Stub()
 
-    @Autowired
-    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
-
     @Value('${app.ncmp.async-m2m.topic}')
     def topic
 
-    def setup() {
-        activateListeners()
-    }
-
     def 'Legacy event consumer with cloud event.'() {
-        given: 'a cloud event of type: #eventType'
+        given: 'a data operation cloud event type'
             def cloudEvent = CloudEventBuilder.v1().withId('some id')
                 .withType('DataOperationEvent')
                 .withSource(URI.create('some-source'))
                 .build()
         when: 'send the cloud event'
             cloudEventKafkaTemplate.send(topic, cloudEvent)
-        and: 'wait a little for async processing of message'
+        then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
             TimeUnit.MILLISECONDS.sleep(300)
-        then: 'event is not consumed'
+        and: 'event is not consumed'
             0 * mockEventsPublisher.publishEvent(*_)
     }
 
     def 'Legacy event consumer with valid legacy event.'() {
-        given: 'a cloud event of type: #eventType'
+        given: 'a legacy event'
             DmiAsyncRequestResponseEvent legacyEvent = new DmiAsyncRequestResponseEvent(eventId:'legacyEventId', eventTarget:'legacyEventTarget')
+        and: 'a flag to track the publish event call'
+            def publishEventMethodCalled = false
+        and: 'the (mocked) events publisher will use the flag to indicate if it is called'
+            mockEventsPublisher.publishEvent(*_) >> {
+                publishEventMethodCalled = true
+            }
         when: 'send the cloud event'
             legacyEventKafkaTemplate.send(topic, legacyEvent)
-        and: 'wait a little for async processing of message'
-            TimeUnit.MILLISECONDS.sleep(300)
         then: 'the event is consumed by the (legacy) AsynRestRequest consumer'
-            1 * mockEventsPublisher.publishEvent(*_)
+            new PollingConditions().within(1) {
+                assert publishEventMethodCalled == true
+            }
     }
 
     def 'Filtering Cloud Events on Type.'() {
@@ -89,17 +86,23 @@ class FilterStrategiesIntegrationSpec extends MessagingBaseSpec {
                 .withType(eventType)
                 .withSource(URI.create('some-source'))
                 .build()
+        and: 'a flag to track the publish event call'
+            def publishEventMethodCalled = false
+        and: 'the (mocked) events publisher will use the flag to indicate if it is called'
+            mockEventsPublisher.publishCloudEvent(*_) >> {
+                publishEventMethodCalled = true
+            }
         when: 'send the cloud event'
             cloudEventKafkaTemplate.send(topic, cloudEvent)
-        and: 'wait a little for async processing of message'
-            TimeUnit.MILLISECONDS.sleep(300)
         then: 'the event has only been forwarded for the correct type'
-            expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(*_)
+            new PollingConditions(initialDelay: 0.3).within(1) {
+                assert publishEventMethodCalled == expectCallToPublishEventMethod
+            }
         where: 'the following event types are used'
-            eventType                                        || expectedNUmberOfCallsToPublishForwardedEvent
-            'DataOperationEvent'                             || 1
-            'other type'                                     || 0
-            'any type contain the word "DataOperationEvent"' || 1
+            eventType                                        || expectCallToPublishEventMethod
+            'DataOperationEvent'                             || true
+            'other type'                                     || false
+            'any type contain the word "DataOperationEvent"' || true
     }
 
     //TODO Toine, add positive test with data to prove event is converted correctly (using correct factory)
@@ -107,15 +110,10 @@ class FilterStrategiesIntegrationSpec extends MessagingBaseSpec {
     def 'Non cloud events on same Topic.'() {
         when: 'sending a non-cloud event on the same topic'
             legacyEventKafkaTemplate.send(topic, 'simple string event')
-        and: 'wait a little for async processing of message'
+        then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
             TimeUnit.MILLISECONDS.sleep(300)
-        then: 'the event is not processed by this consumer'
+        and: 'the event is not processed by this consumer'
             0 * mockEventsPublisher.publishCloudEvent(*_)
     }
 
-    def activateListeners() {
-        kafkaListenerEndpointRegistry.getListenerContainers().forEach(
-            messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
-        )
-    }
 }