import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.onap.cps.integration.base.CpsIntegrationSpecBase
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventConsumer
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.NcmpInEventConsumer
import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
+import spock.util.concurrent.PollingConditions
class CmSubscriptionSpec extends CpsIntegrationSpecBase {
def testResponseProducer
def listAppender = new ListAppender<ILoggingEvent>()
- def logger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer)
+ def ncmpInEventLogger = (Logger) LoggerFactory.getLogger(NcmpInEventConsumer)
+ def dmiEventConsumerLogger = (Logger) LoggerFactory.getLogger(EventConsumer)
def setup() {
registerCmHandlesForSubscriptions()
testRequestProducer = kafkaTestContainer.createProducer('test-client-id', StringSerializer.class)
testResponseProducer = kafkaTestContainer.createProducer('test-client-id', CloudEventSerializer.class)
listAppender.start()
- logger.addAppender(listAppender)
+ ncmpInEventLogger.addAppender(listAppender)
+ dmiEventConsumerLogger.addAppender(listAppender)
}
def cleanup() {
- logger.detachAndStopAllAppenders()
+ ncmpInEventLogger.detachAndStopAllAppenders()
+ dmiEventConsumerLogger.detachAndStopAllAppenders()
dmiInConsumer.unsubscribe()
dmiInConsumer.close()
testRequestProducer.close()
def eventDataNodeSelector = (dmi1DataNodeSelector + dmi2DataNodeSelector)
def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'myDataJobId01', eventDataNodeSelector)
when: 'a subscription create request is sent'
- sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01')
+ sendSubscriptionRequest(subscriptionTopic, 'key', eventPayload, 'myDataJobId01', 'dataJobCreated')
then: 'log shows event is consumed by ncmp'
def messages = listAppender.list*.formattedMessage
messages.any { msg -> msg.contains('myDataJobId01') && msg.contains('dataJobCreated') }
def 'Create subscription accepted by DMI.'() {
given: 'a persisted subscription'
- def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\'0\\\']\\n')
- sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob')
+ def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'newDataJob', '/parent[id=\\\"0\\\"]\\n')
+ sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'newDataJob', 'dataJobCreated')
when: 'dmi accepts the subscription create request'
sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-0', 'newDataJob#dmi-0')
then: 'there are no more inactive data node selector for given datajob id'
def eventDataNodeSelector = (overlappingDmi1DataNodeSelector + newDmi2DataNodeSelector)
def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'partialOverlappingDataJobId', eventDataNodeSelector)
when: 'create request event for overlapping subscription is sent'
- sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId')
+ sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'partialOverlappingDataJobId', 'dataJobCreated')
then: 'log shows event is consumed by ncmp'
def messages = listAppender.list*.formattedMessage
messages.any { msg -> msg.contains('partialOverlappingDataJobId') && msg.contains('dataJobCreated') }
given: 'a new data node selector'
def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n'''
and: 'an event payload'
- def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJob', dataNodeSelector)
+ def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'fullyOverlappingDataJobId', dataNodeSelector)
and: 'existing active subscriptions in database'
createAndAcceptSubscriptionA()
createAndAcceptSubscriptionB()
when: 'a new subscription create request is sent'
- sendSubscriptionRequest(subscriptionTopic, 'fullyOverlappingDataJob', eventPayload, 'myDataJobId')
+ sendSubscriptionRequest(subscriptionTopic, 'some key', eventPayload, 'fullyOverlappingDataJobId', 'dataJobCreated')
then: 'log shows event is consumed by ncmp'
def messages = listAppender.list*.formattedMessage
- messages.any { msg -> msg.contains('fullyOverlappingDataJob') && msg.contains('dataJobCreated') }
+ messages.any { msg -> msg.contains('fullyOverlappingDataJobId') && msg.contains('dataJobCreated') }
and: 'the 2 data node selectors for the given data job id is persisted'
assert cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
- '//subscription/dataJobId[text()=\'fullyOverlappingDataJob\']', DIRECT_CHILDREN_ONLY).size() == 2
+ '//subscription/dataJobId[text()=\'fullyOverlappingDataJobId\']', DIRECT_CHILDREN_ONLY).size() == 2
and: 'there are no inactive data node selector'
- assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJob').size() == 0
+ assert cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors('fullyOverlappingDataJobId').size() == 0
and: 'get correlation ids from event sent to DMIs'
def correlationIds = getAllConsumedCorrelationIds()
and: 'there is no correlation IDs (event) for any dmi'
- assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJob') }
+ assert !correlationIds.any { correlationId -> correlationId.startsWith('fullyOverlappingDataJobId') }
}
def 'Delete subscription removes last subscriber.'() {
def dataNodeSelector = '/parent[id=\\\"5\\\"]'
and: 'a subscription created'
def createEventPayload = createSubscriptionEventPayload('dataJobCreated', 'lastDataJobId', dataNodeSelector)
- sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId')
+ sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', createEventPayload, 'lastDataJobId', 'dataJobCreated')
and: 'data nodes is persisted '
def dataNodes = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
"//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
assert dataNodes.iterator().next().leaves.dataNodeSelector == '/parent[id="5"]'
when: 'a delete event is received for the subscription'
def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'lastDataJobId', dataNodeSelector)
- sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId')
+ sendSubscriptionRequest(subscriptionTopic, 'lastDataJobId', deleteEventPayload, 'lastDataJobId', 'dataJobDeleted')
then: 'the subscription is fully removed from persistence'
def remainingDataNodeSelector = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
"//subscription/dataJobId[text()=\'lastDataJobId\']", DIRECT_CHILDREN_ONLY)
assert !existingSubscription.isEmpty()
and: 'a new subscription'
def createEventPayload1 = createSubscriptionEventPayload('dataJobCreated', 'id-to-remove', dataNodeSelector)
- sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove')
+ sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', createEventPayload1, 'id-to-remove', 'dataJobCreated')
when: 'a delete event is received'
def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'id-to-remove', dataNodeSelector)
- sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove')
+ sendSubscriptionRequest(subscriptionTopic, 'id-to-remove', deleteEventPayload, 'id-to-remove', 'dataJobDeleted')
then: 'the data job id does not exist in database'
def resultForDeletedSubscription = cpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-job-subscriptions',
"//subscription/dataJobId[text()='id-to-remove']", DIRECT_CHILDREN_ONLY)
given: 'an event payload'
def deleteEventPayload = createSubscriptionEventPayload('dataJobDeleted', 'nonExistingDataJobId', '/nonExisting')
when: 'a delete event is received for the non-existent subscription'
- sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'myDataJobId')
+ sendSubscriptionRequest(subscriptionTopic, 'nonExistingDataJobId', deleteEventPayload, 'nonExistingDataJobId', 'dataJobDeleted')
then: 'no exception is thrown'
noExceptionThrown()
and: 'nothing is sent to DMI'
def createAndAcceptSubscriptionA() {
def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"2\\\"]/child\\n/parent[id=\\\"3\\\"]/child'''
def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobA', dataNodeSelector)
- sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA')
+ sendSubscriptionRequest(subscriptionTopic, 'dataJobA', eventPayload, 'dataJobA', 'dataJobCreated')
sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-1', 'dataJobA#dmi-1')
sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobA#dmi-2')
}
def createAndAcceptSubscriptionB() {
def dataNodeSelector = '''/parent[id=\\\"1\\\"]\\n/parent[id=\\\"3\\\"]/child\\n/parent[id=\\\"4\\\"]'''
def eventPayload = createSubscriptionEventPayload('dataJobCreated', 'dataJobB', dataNodeSelector)
- sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB')
+ sendSubscriptionRequest(subscriptionTopic, 'dataJobB', eventPayload, 'dataJobB', 'dataJobCreated')
sendDmiResponse('1', 'ACCEPTED', 'subscriptionCreateResponse', 'dmi-2', 'dataJobB#dmi-2')
}
- def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId) {
+ def sendSubscriptionRequest(topic, eventKey, eventPayload, dataJobId, eventType) {
def event = new ProducerRecord<>(topic, eventKey, eventPayload)
testRequestProducer.send(event)
- //TODO Add polling within for log to report dataJobId is finished (separate commit)
- sleep(2000)
+ def expectedMessageWhenFinishedProcessingEvent = 'NCMP In Event with eventType=' + eventType + ' has been Processed for dataJobId='+ dataJobId
+ assertEventProcessedBasedOnLogging(expectedMessageWhenFinishedProcessingEvent)
}
def sendDmiResponse(statusCode, statusMessage, eventType, eventSource, correlationId) {
.withExtension('correlationid', correlationId).build()
def event = new ProducerRecord<>(dmiOutTopic, 'key', cloudEvent)
testResponseProducer.send(event)
- sleep(2000)
+ def expectedMessageWhenFinishedProcessingEvent = 'Finished processing DMI subscription response event with details: | correlationId=' + correlationId + ' | eventType=' + eventType
+ assertEventProcessedBasedOnLogging(expectedMessageWhenFinishedProcessingEvent)
+ }
+
+ def assertEventProcessedBasedOnLogging(expectedMessageInLog) {
+ new PollingConditions().within(2) {
+ def messages = listAppender.list*.formattedMessage
+ def eventProcessed = messages.any { msg -> msg.contains(expectedMessageInLog) }
+ assert eventProcessed
+ }
}
def getAllConsumedCorrelationIds() {