import com.fasterxml.jackson.databind.ObjectMapper
import com.hazelcast.map.IMap
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
import org.onap.cps.ncmp.api.inventory.InventoryPersistence
import org.springframework.boot.test.context.SpringBootTest
import spock.util.concurrent.BlockingVariable
+import java.util.concurrent.TimeUnit
+
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionEventForwarder])
class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
EventsPublisher<SubscriptionEvent> mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>)
@SpringBean
IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
-
+ @SpringBean
+ SubscriptionEventResponseOutcome mockSubscriptionEventResponseOutcome = Mock(SubscriptionEventResponseOutcome)
@Autowired
JsonObjectMapper jsonObjectMapper
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'the InventoryPersistence returns private properties for the supplied CM Handles'
1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> [
createYangModelCmHandleWithDmiProperty(1, 1,"shape","circle"),
and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds'
def block = new BlockingVariable<Object>(5)
when: 'the valid event is forwarded'
- objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+ objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
then: 'An asynchronous call is made to the blocking variable'
block.get()
then: 'the event is added to the forwarded subscription event cache'
- 1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set)
+ 1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set, 600, TimeUnit.SECONDS)
and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'
1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1",
- subscriptionEvent -> {
+ consumerRecord.headers(), subscriptionEvent -> {
Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0)
targets["CMHandle1"] == ["shape":"circle"]
targets["CMHandle2"] == ["shape":"square"]
}
)
1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName2", "SCO-9989752-cm-subscription-001-DMIName2",
- subscriptionEvent -> {
+ consumerRecord.headers(), subscriptionEvent -> {
Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0)
targets["CMHandle3"] == ["shape":"triangle"]
}
)
and: 'a separate thread has been created where the map is polled'
1 * mockForwardedSubscriptionEventCache.containsKey("SCO-9989752cm-subscription-001") >> true
- 1 * mockForwardedSubscriptionEventCache.get(_) >> (DMINamesInMap)
+ 1 * mockForwardedSubscriptionEventCache.get(_) >> DMINamesInMap
+ 1 * mockSubscriptionEventResponseOutcome.sendResponse(*_)
and: 'the subscription id is removed from the event cache map returning the asynchronous blocking variable'
1 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> {block.set(_)}
where:
scenario | DMINamesInMap
'there are dmis which have not responded' | ["DMIName1", "DMIName2"] as Set
- 'all dmis have responded ' | [] as Set
+ 'all dmis have responded' | [] as Set
}
def 'Forward CM create subscription where target CM Handles are #scenario'() {
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'the target CMHandles are set to #scenario'
testEventSent.getEvent().getPredicates().setTargets(invalidTargets)
when: 'the event is forwarded'
- objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+ objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
then: 'an operation not yet supported exception is thrown'
thrown(OperationNotYetSupportedException)
where:
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'the InventoryPersistence returns no private properties for the supplied CM Handles'
1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> []
and: 'the thread creation delay is reduced to 2 seconds for testing'
and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds'
def block = new BlockingVariable<Object>(5)
when: 'the valid event is forwarded'
- objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+ objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
then: 'the event is not added to the forwarded subscription event cache'
0 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set)
and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'