/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2023 Nordix Foundation.
+ * Copyright (c) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.cps.ncmp.api.impl.async
import io.cloudevents.core.builder.CloudEventBuilder
+import org.onap.cps.events.EventsPublisher
import org.onap.cps.ncmp.api.impl.config.kafka.KafkaConfig
-import org.onap.cps.ncmp.api.impl.events.EventsPublisher
-import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.api.kafka.ConsumerBaseSpec
import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
import org.spockframework.spring.SpringBean
-import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry
-import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.test.annotation.DirtiesContext
import org.testcontainers.spock.Testcontainers
+import spock.util.concurrent.PollingConditions
import java.util.concurrent.TimeUnit
@SpringBootTest(classes =[DataOperationEventConsumer, AsyncRestRequestResponseEventConsumer, RecordFilterStrategies, KafkaConfig])
@DirtiesContext
@Testcontainers
@EnableAutoConfiguration
-class FilterStrategiesIntegrationSpec extends MessagingBaseSpec {
+class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
@SpringBean
EventsPublisher mockEventsPublisher = Mock()
@SpringBean
NcmpAsyncRequestResponseEventMapper mapper = Stub()
- @Autowired
- private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
-
@Value('${app.ncmp.async-m2m.topic}')
def topic
- def setup() {
- activateListeners()
- }
-
def 'Legacy event consumer with cloud event.'() {
- given: 'a cloud event of type: #eventType'
+ given: 'a data operation cloud event type'
def cloudEvent = CloudEventBuilder.v1().withId('some id')
.withType('DataOperationEvent')
.withSource(URI.create('some-source'))
.build()
when: 'send the cloud event'
cloudEventKafkaTemplate.send(topic, cloudEvent)
- and: 'wait a little for async processing of message'
+ then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
TimeUnit.MILLISECONDS.sleep(300)
- then: 'event is not consumed'
+ and: 'event is not consumed'
0 * mockEventsPublisher.publishEvent(*_)
}
def 'Legacy event consumer with valid legacy event.'() {
- given: 'a cloud event of type: #eventType'
+ given: 'a legacy event'
DmiAsyncRequestResponseEvent legacyEvent = new DmiAsyncRequestResponseEvent(eventId:'legacyEventId', eventTarget:'legacyEventTarget')
+ and: 'a flag to track the publish event call'
+ def publishEventMethodCalled = false
+ and: 'the (mocked) events publisher will use the flag to indicate if it is called'
+ mockEventsPublisher.publishEvent(*_) >> {
+ publishEventMethodCalled = true
+ }
when: 'send the cloud event'
legacyEventKafkaTemplate.send(topic, legacyEvent)
- and: 'wait a little for async processing of message'
- TimeUnit.MILLISECONDS.sleep(300)
then: 'the event is consumed by the (legacy) AsynRestRequest consumer'
- 1 * mockEventsPublisher.publishEvent(*_)
+ new PollingConditions().within(1) {
+ assert publishEventMethodCalled == true
+ }
}
def 'Filtering Cloud Events on Type.'() {
.withType(eventType)
.withSource(URI.create('some-source'))
.build()
+ and: 'a flag to track the publish event call'
+ def publishEventMethodCalled = false
+ and: 'the (mocked) events publisher will use the flag to indicate if it is called'
+ mockEventsPublisher.publishCloudEvent(*_) >> {
+ publishEventMethodCalled = true
+ }
when: 'send the cloud event'
cloudEventKafkaTemplate.send(topic, cloudEvent)
- and: 'wait a little for async processing of message'
- TimeUnit.MILLISECONDS.sleep(300)
then: 'the event has only been forwarded for the correct type'
- expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(*_)
+ new PollingConditions(initialDelay: 0.3).within(1) {
+ assert publishEventMethodCalled == expectCallToPublishEventMethod
+ }
where: 'the following event types are used'
- eventType || expectedNUmberOfCallsToPublishForwardedEvent
- 'DataOperationEvent' || 1
- 'other type' || 0
- 'any type contain the word "DataOperationEvent"' || 1
+ eventType || expectCallToPublishEventMethod
+ 'DataOperationEvent' || true
+ 'other type' || false
+ 'any type contain the word "DataOperationEvent"' || true
}
//TODO Toine, add positive test with data to prove event is converted correctly (using correct factory)
def 'Non cloud events on same Topic.'() {
when: 'sending a non-cloud event on the same topic'
legacyEventKafkaTemplate.send(topic, 'simple string event')
- and: 'wait a little for async processing of message'
+ then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
TimeUnit.MILLISECONDS.sleep(300)
- then: 'the event is not processed by this consumer'
+ and: 'the event is not processed by this consumer'
0 * mockEventsPublisher.publishCloudEvent(*_)
}
- def activateListeners() {
- kafkaListenerEndpointRegistry.getListenerContainers().forEach(
- messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
- )
- }
}