/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2021 Bell Canada.
+ * Copyright (c) 2021-2022 Bell Canada.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.onap.cps.utils.DateTimeUtility
import org.onap.cps.api.CpsAdminService
import org.onap.cps.api.CpsDataService
+import org.onap.cps.event.model.Content
import org.onap.cps.event.model.Data
import org.onap.cps.spi.FetchDescendantsOption
import org.onap.cps.spi.model.Anchor
def dateTimeFormat = 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ'
def 'Create a CPS data updated event successfully: #scenario'() {
-
given: 'cps admin service is able to return anchor details'
mockCpsAdminService.getAnchor(myDataspaceName, myAnchorName) >>
new Anchor(myAnchorName, myDataspaceName, mySchemasetName)
def dataNode = new DataNodeBuilder().withXpath(xpath).withLeaves(['leafName': 'leafValue']).build()
mockCpsDataService.getDataNode(
myDataspaceName, myAnchorName, xpath, FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> dataNode
-
when: 'CPS data updated event is created'
def cpsDataUpdatedEvent = objectUnderTest.createCpsDataUpdatedEvent(myDataspaceName,
- myAnchorName, DateTimeUtility.toOffsetDateTime(inputObservedTimestamp))
-
+ myAnchorName, DateTimeUtility.toOffsetDateTime(inputObservedTimestamp), Operation.CREATE)
then: 'CPS data updated event is created with correct envelope'
-
with(cpsDataUpdatedEvent) {
type == 'org.onap.cps.data-updated-event'
source == new URI('urn:cps:org.onap.cps')
assert anchorName == myAnchorName
assert dataspaceName == myDataspaceName
assert schemaSetName == mySchemasetName
+ assert operation == Content.Operation.CREATE
assert data == new Data().withAdditionalProperty('leafName', 'leafValue')
}
where: