package org.onap.cps.ncmp.api.impl.events.cmsubscription
+import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DATASPACE_NAME
+
import com.fasterxml.jackson.databind.ObjectMapper
import com.hazelcast.map.IMap
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistenceImpl
-import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventResponseCloudMapper
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent
import org.onap.cps.ncmp.utils.TestUtils
IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
def mockSubscriptionPersistence = Mock(SubscriptionPersistenceImpl)
def mockSubscriptionEventResponseMapper = Mock(CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper)
- def mockSubscriptionEventResponseOutcome = Mock(CmSubscriptionNcmpOutEventPublisher)
- def mockSubscriptionEventResponseCloudMapper = new SubscriptionEventResponseCloudMapper(new ObjectMapper())
+ def mockCmSubscriptionNcmpOutEventPublisher = Mock(CmSubscriptionNcmpOutEventPublisher)
def objectUnderTest = new CmSubscriptionDmiOutEventConsumer(mockForwardedSubscriptionEventCache,
- mockSubscriptionPersistence, mockSubscriptionEventResponseMapper, mockSubscriptionEventResponseOutcome, mockSubscriptionEventResponseCloudMapper)
+ mockSubscriptionPersistence, mockSubscriptionEventResponseMapper, mockCmSubscriptionNcmpOutEventPublisher)
- def 'Consume Subscription Event Response where all DMIs have responded'() {
- given: 'a consumer record including cloud event having subscription response'
- def consumerRecordWithCloudEventAndSubscriptionResponse = getConsumerRecord()
- and: 'a subscription response event'
- def subscriptionResponseEvent = getSubscriptionResponseEvent()
- and: 'a subscription event response and notifications are enabled'
+ def 'Consume dmi out event where all DMIs have responded'() {
+ given: 'a consumer record including cloud event having dmi out event'
+ def dmiOutConsumerRecord = getDmiOutConsumerRecord()
+ and: 'notifications are enabled'
objectUnderTest.notificationFeatureEnabled = notificationEnabled
and: 'subscription model loader is enabled'
objectUnderTest.subscriptionModelLoaderEnabled = modelLoaderEnabled
and: 'subscription persistence service returns data node includes no pending cm handle'
mockSubscriptionPersistence.getCmHandlesForSubscriptionEvent(*_) >> [getDataNode()]
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEventResponse(consumerRecordWithCloudEventAndSubscriptionResponse)
+ objectUnderTest.consumeDmiOutEvent(dmiOutConsumerRecord)
then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event'
1 * mockForwardedSubscriptionEventCache.containsKey('SCO-9989752cm-subscription-001') >> true
1 * mockForwardedSubscriptionEventCache.get('SCO-9989752cm-subscription-001') >> (['some-dmi-name'] as Set)
and: 'the subscription event is removed from the map'
numberOfTimeToRemove * mockForwardedSubscriptionEventCache.remove('SCO-9989752cm-subscription-001')
and: 'a response outcome has been created'
- numberOfTimeToResponse * mockSubscriptionEventResponseOutcome.sendResponse(subscriptionResponseEvent, 'subscriptionCreated')
+ numberOfTimeToResponse * mockCmSubscriptionNcmpOutEventPublisher.sendResponse(_, 'subscriptionCreated')
where: 'the following values are used'
scenario | modelLoaderEnabled | notificationEnabled || numberOfTimeToPersist || numberOfTimeToRemove || numberOfTimeToResponse
'Both model loader and notification are enabled' | true | true || 1 || 1 || 1
'Model loader disabled and notification enabled' | false | true || 0 || 1 || 1
}
- def 'Consume Subscription Event Response where another DMI has not yet responded'() {
+ def 'Consume dmi out event where another DMI has not yet responded'() {
given: 'a subscription event response and notifications are enabled'
objectUnderTest.notificationFeatureEnabled = notificationEnabled
and: 'subscription model loader is enabled'
objectUnderTest.subscriptionModelLoaderEnabled = modelLoaderEnabled
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEventResponse(getConsumerRecord())
+ objectUnderTest.consumeDmiOutEvent(getDmiOutConsumerRecord())
then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event'
1 * mockForwardedSubscriptionEventCache.containsKey('SCO-9989752cm-subscription-001') >> true
1 * mockForwardedSubscriptionEventCache.get('SCO-9989752cm-subscription-001') >> (['responded-dmi', 'non-responded-dmi'] as Set)
and: 'the subscription event is not removed from the map'
0 * mockForwardedSubscriptionEventCache.remove(_)
and: 'a response outcome has not been created'
- 0 * mockSubscriptionEventResponseOutcome.sendResponse(*_)
+ 0 * mockCmSubscriptionNcmpOutEventPublisher.sendResponse(*_)
where: 'the following values are used'
scenario | modelLoaderEnabled | notificationEnabled || numberOfTimeToPersist
'Both model loader and notification are enabled' | true | true || 1
'Model loader disabled and notification enabled' | false | true || 0
}
- def getSubscriptionResponseEvent() {
- def subscriptionResponseJsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiOutEvent.json')
- return jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, CmSubscriptionDmiOutEvent.class)
+ def getDmiOutEvent() {
+ def cmSubscriptionDmiOutEventJsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiOutEvent.json')
+ return jsonObjectMapper.convertJsonString(cmSubscriptionDmiOutEventJsonData, CmSubscriptionDmiOutEvent.class)
}
- def getCloudEventHavingSubscriptionResponseEvent() {
+ def getCloudEvent() {
return CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(getSubscriptionResponseEvent()))
+ .withData(objectMapper.writeValueAsBytes(getDmiOutEvent()))
.withId('some-id')
.withType('subscriptionCreated')
.withSource(URI.create('NCMP')).build()
}
- def getConsumerRecord() {
- return new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', getCloudEventHavingSubscriptionResponseEvent())
+ def getDmiOutConsumerRecord() {
+ return new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', getCloudEvent())
}
def getDataNode() {
def leaves = [status:'ACCEPTED', cmHandleId:'cmhandle1'] as Map
- return new DataNodeBuilder().withDataspace('NCMP-Admin')
+ return new DataNodeBuilder().withDataspace(NCMP_DATASPACE_NAME)
.withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription')
.withLeaves(leaves).build()
}