import com.hazelcast.map.IMap
import okhttp3.mockwebserver.MockWebServer
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.api.CpsAnchorService
import org.onap.cps.api.CpsDataService
import org.onap.cps.api.CpsDataspaceService
import spock.lang.Shared
import spock.lang.Specification
+import java.time.Duration
import java.time.OffsetDateTime
import java.util.concurrent.BlockingQueue
@ActiveProfiles('module-sync-delayed')
abstract class CpsIntegrationSpecBase extends Specification {
+ static KafkaConsumer kafkaConsumer
+
@Shared
DatabaseTestContainer databaseTestContainer = DatabaseTestContainer.getInstance()
networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
}
+ def subscribeAndClearPreviousMessages(consumerGroupId, topicName) {
+ kafkaConsumer = KafkaTestContainer.getConsumer(consumerGroupId, StringDeserializer.class)
+ kafkaConsumer.subscribe([topicName])
+ kafkaConsumer.poll(Duration.ofMillis(500))
+ }
+
+ def getLatestConsumerRecordsWithMaxPollOf1Second(numberOfRecordsToRead) {
+ def consumerRecords = []
+ def retryAttempts = 10
+ while (consumerRecords.size() < numberOfRecordsToRead) {
+ retryAttempts--
+ consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100)))
+ if (retryAttempts == 0)
+ break
+ }
+ return consumerRecords
+ }
+
}
package org.onap.cps.integration.functional.ncmp.inventory
-import java.time.Duration
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.cps.integration.KafkaTestContainer
+
import org.onap.cps.integration.base.CpsIntegrationSpecBase
import org.onap.cps.ncmp.api.NcmpResponseStatus
-import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
+import org.onap.cps.ncmp.api.inventory.models.CmHandleState
import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
+import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory
import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
-import org.onap.cps.ncmp.api.inventory.models.CmHandleState
-import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory
+import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
class CmHandleCreateSpec extends CpsIntegrationSpecBase {
NetworkCmProxyInventoryFacadeImpl objectUnderTest
def uniqueId = 'ch-unique-id-for-create-test'
- static KafkaConsumer kafkaConsumer
-
def setup() {
objectUnderTest = networkCmProxyInventoryFacade
- subscribeAndClearPreviousMessages()
+ subscribeAndClearPreviousMessages('test-group', 'ncmp-events')
}
def cleanup() {
dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2']
when: 'a CM-handle is registered for creation'
- def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId)
+ def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId, dataProducerIdentifier: 'my-data-producer-identifier')
def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
then: 'get the latest messages'
- def consumerRecords = getLatestConsumerRecords()
+ def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(2)
and: 'both converted messages are for the correct cm handle'
def notificationMessages = []
}
assert notificationMessages.event.cmHandleId == [ uniqueId, uniqueId ]
- and: 'the oldest event is about the update to ADVISED state'
- notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED'
+ and: 'the oldest event is about the update to ADVISED state and it has a data producer id'
+ assert notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED'
+ assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-identifier'
+
+ and: 'the next event is about update to READY state and it has the same data producer identifier as in in ADVISED state'
+ assert notificationMessages[1].event.newValues.cmHandleState.value() == 'READY'
+ assert notificationMessages[1].event.dataProducerIdentifier == 'my-data-producer-identifier'
- and: 'the next event is about update to READY state'
- notificationMessages[1].event.newValues.cmHandleState.value() == 'READY'
+ and: 'there are no more messages to be read'
+ assert getLatestConsumerRecordsWithMaxPollOf1Second(1).size() == 0
cleanup: 'deregister CM handle'
deregisterCmHandle(DMI1_URL, uniqueId)
deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2'])
}
- def subscribeAndClearPreviousMessages() {
- kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class)
- kafkaConsumer.subscribe(['ncmp-events'])
- kafkaConsumer.poll(Duration.ofMillis(500))
- }
-
- def getLatestConsumerRecords() {
- def consumerRecords = []
- def retryAttempts = 10
- while (consumerRecords.size() < 2) {
- retryAttempts--
- consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100)))
- if (retryAttempts == 0)
- break
- }
- return consumerRecords
- }
-
}
import org.onap.cps.integration.base.CpsIntegrationSpecBase
import org.onap.cps.ncmp.api.NcmpResponseStatus
-import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse
import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration
import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
+import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
class CmHandleUpdateSpec extends CpsIntegrationSpecBase {
def setup() {
objectUnderTest = networkCmProxyInventoryFacade
+ subscribeAndClearPreviousMessages('test-group-for-update', 'ncmp-events')
+ }
+
+ def cleanup() {
+ kafkaConsumer.unsubscribe()
+ kafkaConsumer.close()
}
def 'Update of CM-handle with new or unchanged alternate ID succeeds.'() {
deregisterCmHandle(DMI1_URL, 'ch-1')
}
+ def 'CM Handle registration to verify changes in data producer identifier'() {
+ given: 'DMI will return modules when requested'
+ def cmHandleId = 'ch-id-for-update'
+ dmiDispatcher1.moduleNamesPerCmHandleId[cmHandleId] = ['M1', 'M2']
+
+ when: 'a CM-handle is registered for creation'
+
+ def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: cmHandleId)
+ def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
+ def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
+
+ then: 'registration gives successful response'
+ assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
+
+ then: 'the module sync watchdog is triggered'
+ moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
+
+ and: 'flush the latest cm handle registration events( state transition from NONE to ADVISED and ADVISED to READY)'
+ getLatestConsumerRecordsWithMaxPollOf1Second(2)
+
+ and: 'cm handle updated with the data producer identifier'
+ def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, dataProducerIdentifier: 'my-data-producer-id')
+ def dmiPluginRegistrationForUpdate = new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate])
+ def dmiPluginRegistrationResponseForUpdate = objectUnderTest.updateDmiRegistration(dmiPluginRegistrationForUpdate)
+
+ then: 'registration gives successful response'
+ assert dmiPluginRegistrationResponseForUpdate.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)]
+
+ and: 'get the latest message'
+ def consumerRecords = getLatestConsumerRecordsWithMaxPollOf1Second(1)
+
+ and: 'the message has the updated data producer identifier'
+ def notificationMessages = []
+ for (def consumerRecord : consumerRecords) {
+ notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent))
+ }
+ assert notificationMessages[0].event.cmHandleId.contains(cmHandleId)
+ assert notificationMessages[0].event.dataProducerIdentifier == 'my-data-producer-id'
+
+ cleanup: 'deregister CM handle'
+ deregisterCmHandle(DMI1_URL, cmHandleId)
+ }
+
}