package org.onap.cps.ncmp.api.impl.events.avcsubscription
import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
given: 'an event with data category CM'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'notifications are enabled'
objectUnderTest.notificationFeatureEnabled = true
and: 'subscription model loader is enabled'
objectUnderTest.subscriptionModelLoaderEnabled = true
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEvent(testEventSent)
+ objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'the event is mapped to a yangModelSubscription'
1 * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent) >> yangModelSubscriptionEvent
and: 'the event is persisted'
1 * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent)
and: 'the event is forwarded'
- 1 * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent)
+ 1 * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
}
def 'Consume valid CM create message where notifications and model loader are disabled'() {
given: 'an event with data category CM'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'notifications are disabled'
objectUnderTest.notificationFeatureEnabled = false
and: 'subscription model loader is disabled'
objectUnderTest.subscriptionModelLoaderEnabled = false
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEvent(testEventSent)
+ objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'the event is not mapped to a yangModelSubscription'
0 * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(*_) >> yangModelSubscriptionEvent
and: 'the event is not persisted'
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'dataCategory is set to FM'
testEventSent.getEvent().getDataType().setDataCategory("FM")
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEvent(testEventSent)
+ objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'no exception is thrown'
noExceptionThrown()
and: 'the event is not mapped to a yangModelSubscription'
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'datastore is set to a non passthrough datastore'
testEventSent.getEvent().getPredicates().setDatastore("operational")
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEvent(testEventSent)
+ objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'an operation not yet supported exception is thrown'
thrown(OperationNotYetSupportedException)
}