Subscription Create Event Outcome Kafka Part
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / events / avcsubscription / SubscriptionEventForwarderSpec.groovy
index a3dec29..63ddcef 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.hazelcast.map.IMap
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence
@@ -35,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import spock.util.concurrent.BlockingVariable
 
+import java.util.concurrent.TimeUnit
+
 @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionEventForwarder])
 class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
 
@@ -47,7 +50,8 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
     EventsPublisher<SubscriptionEvent> mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>)
     @SpringBean
     IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
-
+    @SpringBean
+    SubscriptionEventResponseOutcome mockSubscriptionEventResponseOutcome = Mock(SubscriptionEventResponseOutcome)
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
@@ -55,6 +59,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
         given: 'an event'
             def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+            def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
         and: 'the InventoryPersistence returns private properties for the supplied CM Handles'
             1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> [
                 createYangModelCmHandleWithDmiProperty(1, 1,"shape","circle"),
@@ -66,44 +71,46 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
         and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds'
             def block = new BlockingVariable<Object>(5)
         when: 'the valid event is forwarded'
-            objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+            objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
         then: 'An asynchronous call is made to the blocking variable'
             block.get()
         then: 'the event is added to the forwarded subscription event cache'
-            1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set)
+            1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set, 600, TimeUnit.SECONDS)
         and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'
             1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1",
-                subscriptionEvent -> {
+                consumerRecord.headers(), subscriptionEvent -> {
                     Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0)
                     targets["CMHandle1"] == ["shape":"circle"]
                     targets["CMHandle2"] == ["shape":"square"]
                 }
             )
             1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName2", "SCO-9989752-cm-subscription-001-DMIName2",
-                subscriptionEvent -> {
+                consumerRecord.headers(), subscriptionEvent -> {
                     Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0)
                     targets["CMHandle3"] == ["shape":"triangle"]
                 }
             )
         and: 'a separate thread has been created where the map is polled'
             1 * mockForwardedSubscriptionEventCache.containsKey("SCO-9989752cm-subscription-001") >> true
-            1 * mockForwardedSubscriptionEventCache.get(_) >> (DMINamesInMap)
+            1 * mockForwardedSubscriptionEventCache.get(_) >> DMINamesInMap
+            1 * mockSubscriptionEventResponseOutcome.sendResponse(*_)
         and: 'the subscription id is removed from the event cache map returning the asynchronous blocking variable'
             1 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> {block.set(_)}
         where:
             scenario                                  | DMINamesInMap
             'there are dmis which have not responded' | ["DMIName1", "DMIName2"] as Set
-            'all dmis have responded '                | [] as Set
+            'all dmis have responded                | [] as Set
     }
 
     def 'Forward CM create subscription where target CM Handles are #scenario'() {
         given: 'an event'
             def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+            def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
         and: 'the target CMHandles are set to #scenario'
             testEventSent.getEvent().getPredicates().setTargets(invalidTargets)
         when: 'the event is forwarded'
-            objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+            objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
         then: 'an operation not yet supported exception is thrown'
             thrown(OperationNotYetSupportedException)
         where:
@@ -117,6 +124,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
         given: 'an event'
             def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+            def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
         and: 'the InventoryPersistence returns no private properties for the supplied CM Handles'
             1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> []
         and: 'the thread creation delay is reduced to 2 seconds for testing'
@@ -124,7 +132,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
         and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds'
             def block = new BlockingVariable<Object>(5)
         when: 'the valid event is forwarded'
-            objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+            objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
         then: 'the event is not added to the forwarded subscription event cache'
             0 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set)
         and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'