import static org.onap.cps.api.parameters.FetchDescendantsOption.DIRECT_CHILDREN_ONLY
+import ch.qos.logback.classic.Logger
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.read.ListAppender
import io.cloudevents.core.builder.CloudEventBuilder
import io.cloudevents.kafka.CloudEventSerializer
-import io.cloudevents.kafka.CloudEventDeserializer
import java.nio.charset.StandardCharsets
import java.time.Duration
import org.apache.kafka.clients.producer.ProducerRecord
def testResponseProducer
def listAppender = new ListAppender<ILoggingEvent>()
- def logger
+ def logger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer)
def setup() {
registerCmHandlesForSubscriptions()
dmiInConsumer.poll(Duration.ofMillis(500))
testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class)
testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class)
- logger = LoggerFactory.getLogger(NcmpInEventConsumer)
listAppender.start()
logger.addAppender(listAppender)
}
def cleanup() {
+ logger.detachAndStopAllAppenders()
dmiInConsumer.unsubscribe()
dmiInConsumer.close()
testRequestProducer.close()
testResponseProducer.close()
kafkaTestContainer.close()
deregisterCmHandles('dmi-0', ['cmHandle0'])
- deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2'])
+ deregisterCmHandles('dmi-1', ['cmHandle1', 'cmHandle2', 'cmHandle5'])
deregisterCmHandles('dmi-2', ['cmHandle3', 'cmHandle4'])
}
def 'Create subscription and send to multiple DMIs'() {
- given: 'a data node selector on DMI-1'
+ given: 'data node selector with two paths on DMI-1'
def dmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
- and: 'a data node selector on DMI-2'
+ and: 'data node selector with one path on DMI-2'
def dmi2DataNodeSelector = '/parent[id=\\\"3\\\"]/child'
and: 'an event payload'
def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector)
- def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId', eventDataNodeSelector)
+ def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId01', eventDataNodeSelector)
when: 'a subscription create request is sent'
- sendSubscriptionCreateRequest(subscriptionTopic, 'key', eventPayload)
+ sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01')
then: 'log shows event is consumed by ncmp'
def messages = listAppender.list*.formattedMessage
- messages.any { msg -> msg.contains('myDataJobId') && msg.contains('dataJobCreated')}
+ messages.any { msg -> msg.contains('myDataJobId01') && msg.contains('dataJobCreated') }
and: 'the 3 different data node selectors for the given data job id is persisted'
- assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId').size() == 3
+ assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('myDataJobId01').size() == 3
and: 'get correlation ids from event sent to DMIs'
def correlationIds = getAllConsumedCorrelationIds()
and: 'there is correlation IDs (event) for each affected dmi (DMI-1, DMI-2)'
assert correlationIds.size() == 2
- assert correlationIds.containsAll(['myDataJobId#dmi-1', 'myDataJobId#dmi-2'])
+ assert correlationIds.containsAll(['myDataJobId01#dmi-1', 'myDataJobId01#dmi-2'])
}
- def 'Update subscription status'() {
+
+ def 'Create subscription accepted by DMI.'() {
given: 'a persisted subscription'
def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n')
- sendSubscriptionCreateRequest(subscriptionTopic, 'newDataJob', eventPayload)
+ sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob')
when: 'dmi accepts the subscription create request'
sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0')
then: 'there are no more inactive data node selector for given datajob id'
assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('newDataJob').size() == 0
and: 'status for the data node selector for given data job id is ACCEPTED'
- def affectedDataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
- '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY)
- assert affectedDataNodes.leaves.every( entry -> entry.get('status') == 'ACCEPTED')
+ def affectedDataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+ '//subscription/dataJobId[text()=\'newDataJob\']', DIRECT_CHILDREN_ONLY)
+ assert affectedDataNodes.leaves.every(entry -> entry.get('status') == 'ACCEPTED')
}
def 'Create new subscription which partially overlaps with an existing active subscription'() {
- given: 'an existing data node selector on DMI-1'
- def existingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n'''
- and: 'a new data node selector on DMI-2'
- def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]'
- and: 'an event payload'
- def eventDataNodeSelector = (existingDmi1DataNodeSelector + newDmi2DataNodeSelector)
- def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJob', eventDataNodeSelector)
- and: 'an active subscription in database'
+ given: 'an active subscription in database'
createAndAcceptSubscriptionA()
- when: 'a new subscription create request is sent'
- sendSubscriptionCreateRequest(subscriptionTopic, 'partialOverlappingDataJob', eventPayload)
+ and: 'and a partial overlapping subscription'
+ def overlappingDmi1DataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n'''
+ def newDmi2DataNodeSelector = '/parent[id=\\\"4\\\"]'
+ def eventDataNodeSelector = (overlappingDmi1DataNodeSelector + newDmi2DataNodeSelector)
+ def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJobId', eventDataNodeSelector)
+ when: 'create request event for overlapping subscription is sent'
+ sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId')
then: 'log shows event is consumed by ncmp'
def messages = listAppender.list*.formattedMessage
- messages.any { msg -> msg.contains('partialOverlappingDataJob') && msg.contains('dataJobCreated')}
+ messages.any { msg -> msg.contains('partialOverlappingDataJobId') && msg.contains('dataJobCreated') }
and: 'the 3 data node selectors for the given data job id is persisted'
assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
- '//subscription/dataJobId[text()=\'partialOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 3
+ '//subscription/dataJobId[text()=\'partialOverlappingDataJobId\']', DIRECT_CHILDREN_ONLY).size() == 3
and: 'only one data node selector is not active'
- assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJob').size() == 1
+ assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('partialOverlappingDataJobId').size() == 1
and: 'get correlation ids from event sent to DMIs'
def correlationIds = getAllConsumedCorrelationIds()
and: 'there is correlation IDs (event) for only the affected dmi (DMI-2)'
- assert !correlationIds.contains('partialOverlappingDataJob#dmi-1')
- assert correlationIds.contains('partialOverlappingDataJob#dmi-2')
+ assert !correlationIds.contains('partialOverlappingDataJobId#dmi-1')
+ assert correlationIds.contains('partialOverlappingDataJobId#dmi-2')
}
def 'Create new subscription which completely overlaps with an active existing subscriptions'() {
createAndAcceptSubscriptionA()
createAndAcceptSubscriptionB()
when: 'a new subscription create request is sent'
- sendSubscriptionCreateRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload)
+ sendSubscriptionRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload, 'myDataJobId')
then: 'log shows event is consumed by ncmp'
def messages = listAppender.list*.formattedMessage
- messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated')}
+ messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated') }
and: 'the 2 data node selectors for the given data job id is persisted'
assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
- '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2
+ '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2
and: 'there are no inactive data node selector'
assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0
and: 'get correlation ids from event sent to DMIs'
assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') }
}
+ def 'Delete subscription removes last subscriber.'() {
+ given: 'an existing subscription with only one data node selector'
+ def dataNodeSelector = '/parent[id=\\\"5\\\"]'
+ and: 'a subscription created'
+ def createEventPayload = createSubscriptionEventPayload('dataJobCreated', 'lastDataJobId', dataNodeSelector)
+ sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId')
+ and: 'data nodes is persisted '
+ def dataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+ "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
+ assert dataNodes.size() == 1
+ assert dataNodes.iterator().next().leaves.dataNodeSelector == '/parent[id="5"]'
+ when: 'a delete event is received for the subscription'
+ def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'lastDataJobId', dataNodeSelector)
+ sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId')
+ then: 'the subscription is fully removed from persistence'
+ def remainingDataNodeSelector = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+ "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
+ assert remainingDataNodeSelector.isEmpty()
+ and: 'no other subscriptions exist for the same dataJobId'
+ def remainingDataJobId = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+ "//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
+ assert remainingDataJobId.isEmpty()
+ and: 'a DMI delete event is published for the affected DMI'
+ def correlationIds = getAllConsumedCorrelationIds()
+ assert correlationIds.contains('lastDataJobId#dmi-1')
+ }
+
+ def 'Delete subscription removes one of multiple subscribers.'() {
+ given: 'data node selector that is used by other subscriptions'
+ def dataNodeSelector = '/parent[id=\\\"1\\\"]'
+ def existingSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+ "/dataJob/subscription[@dataNodeSelector='/parent[id=\"1\"]']", DIRECT_CHILDREN_ONLY).iterator().next().leaves.dataJobId
+ assert !existingSubscription.isEmpty()
+ and: 'a new subscription'
+ def createEventPayload1 = createSubscriptionEventPayload('dataJobCreated', 'id-to-remove', dataNodeSelector)
+ sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove')
+ when: 'a delete event is received'
+ def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'id-to-remove', dataNodeSelector)
+ sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove')
+ then: 'the data job id does not exist in database'
+ def resultForDeletedSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+ "//subscription/dataJobId[text()='id-to-remove']", DIRECT_CHILDREN_ONLY)
+ assert resultForDeletedSubscription.isEmpty()
+ and: 'subscription still exist for the same data node selector'
+ def remainingSubscriptions = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
+ "/dataJob/subscription[@dataNodeSelector='/parent[id=\"1\"]']", DIRECT_CHILDREN_ONLY).iterator().next().leaves.dataJobId
+ assert !remainingSubscriptions.isEmpty()
+ assert !remainingSubscriptions.contains('id-to-remove')
+ and: 'no DMI delete event is published'
+ def correlationIds = getAllConsumedCorrelationIds()
+ assert !correlationIds.contains(['id-to-remove#dmi-1'])
+ }
+
+ def 'Deleting non-existent subscription.'() {
+ given: 'an event payload'
+ def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'nonExistingDataJobId', '/nonExisting')
+ when: 'a delete event is received for the non-existent subscription'
+ sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'myDataJobId')
+ then: 'no exception is thrown'
+ noExceptionThrown()
+ and: 'nothing is sent to DMI'
+ getAllConsumedCorrelationIds().isEmpty()
+ }
+
def registerCmHandlesForSubscriptions() {
- registerCmHandle('dmi-0', 'cmHandle0', '','/parent=0')
- registerCmHandle('dmi-1', 'cmHandle1', '','/parent=1')
- registerCmHandle('dmi-1', 'cmHandle2', '','/parent=2')
- registerCmHandle('dmi-2', 'cmHandle3', '','/parent=3')
- registerCmHandle('dmi-2', 'cmHandle4', '','/parent=4')
+ registerCmHandle('dmi-0', 'cmHandle0', '', '/parent=0')
+ registerCmHandle('dmi-1', 'cmHandle1', '', '/parent=1')
+ registerCmHandle('dmi-1', 'cmHandle2', '', '/parent=2')
+ registerCmHandle('dmi-2', 'cmHandle3', '', '/parent=3')
+ registerCmHandle('dmi-2', 'cmHandle4', '', '/parent=4')
+ registerCmHandle('dmi-1', 'cmHandle5', '', '/parent=5')
}
def createSubscriptionEventPayload(eventType, dataJobId, dataNodeSelector) {
def createAndAcceptSubscriptionA() {
def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child'''
def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector)
- sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobA', eventPayload)
+ sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA')
sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1')
sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2')
}
def createAndAcceptSubscriptionB() {
def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]'''
def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector)
- sendSubscriptionCreateRequest(subscriptionTopic, 'dataJobB', eventPayload)
+ sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB')
sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2')
}
- def sendSubscriptionCreateRequest(topic, eventKey, eventPayload) {
- def event = new ProducerRecord<>(topic, eventKey, eventPayload);
+ def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId) {
+ def event = new ProducerRecord<>(topic, eventKey, eventPayload)
testRequestProducer.send(event)
- sleep(1000)
+ //TODO Add polling within for log to report dataJobId is finished (separate commit)
+ sleep(2000)
}
def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) {
- def eventPayload = readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json')
+ def eventPayload = readResourceDataFile('datajobs/subscription/dmiSubscriptionResponseEvent.json')
eventPayload = eventPayload.replace('#statusCode', statusCode)
eventPayload = eventPayload.replace('#statusMessage', statusMessage)
def cloudEvent = CloudEventBuilder.v1()
- .withData(eventPayload.getBytes(StandardCharsets.UTF_8))
- .withId('random-uuid')
- .withType(eventType)
- .withSource(URI.create(eventSource))
- .withExtension('correlationid', correlationId).build()
- def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent);
+ .withData(eventPayload.getBytes(StandardCharsets.UTF_8))
+ .withId('random-uuid')
+ .withType(eventType)
+ .withSource(URI.create(eventSource))
+ .withExtension('correlationid', correlationId).build()
+ def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent)
testResponseProducer.send(event)
sleep(2000)
}
def value = new String(header.value())
headersMap[key] << value
}
-
}
return headersMap
}