{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "urn:cps:org.onap.cps:data-updated-event-schema:1.0.0",
- "$ref": "#/definitions/CpsDataUpdatedEvent",
+ "$ref": "#/definitions/CpsDataUpdateEvent",
"definitions": {
- "CpsDataUpdatedEvent": {
+ "CpsDataUpdateEvent": {
"description": "The payload for CPS data updated event.",
"type": "object",
"javaType": "org.onap.cps.events.model.CpsDataUpdatedEvent",
"properties": {
- "data": {
+ "eventPayload": {
"type": "object",
"properties": {
"observedTimestamp": {
"description": "The name of CPS Core anchor the data is attached to.",
"type": "string"
},
- "operation": {
+ "action": {
"description": "The operation on the data",
"type": "string",
"enum": [
- "CREATE",
- "UPDATE",
- "DELETE"
+ "create",
+ "replace",
+ "remove"
]
},
"xpath": {
- "description": "xpath of the updated content",
+ "description": "xpath of the modified content",
"type": "string"
}
},
package org.onap.cps.events;
+import static org.onap.cps.events.model.EventPayload.Action.fromValue;
+
import io.cloudevents.CloudEvent;
import io.micrometer.core.annotation.Timed;
import java.time.OffsetDateTime;
import org.onap.cps.api.CpsNotificationService;
import org.onap.cps.api.model.Anchor;
import org.onap.cps.events.model.CpsDataUpdatedEvent;
-import org.onap.cps.events.model.Data;
-import org.onap.cps.events.model.Data.Operation;
+import org.onap.cps.events.model.EventPayload;
import org.onap.cps.utils.DateTimeUtility;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* Send the cps data update event with header to the public topic.
*
- * @param anchor Anchor of the updated data
- * @param xpath xpath of the updated data
- * @param operation operation performed on the data
+ * @param anchor Anchor of the updated data
+ * @param xpath xpath of the updated data
+ * @param action operation performed on the data
* @param observedTimestamp timestamp when data was updated.
*/
@Timed(value = "cps.data.update.events.send", description = "Time taken to send Data Update event")
public void sendCpsDataUpdateEvent(final Anchor anchor, final String xpath,
- final Operation operation, final OffsetDateTime observedTimestamp) {
+ final String action, final OffsetDateTime observedTimestamp) {
if (notificationsEnabled && cpsChangeEventNotificationsEnabled && isNotificationEnabledForAnchor(anchor)) {
- final CpsDataUpdatedEvent cpsDataUpdatedEvent = createCpsDataUpdatedEvent(anchor,
- observedTimestamp, xpath, operation);
+ final CpsDataUpdatedEvent cpsDataUpdatedEvent =
+ createCpsDataUpdatedEvent(anchor, observedTimestamp, xpath, action);
final String updateEventId = anchor.getDataspaceName() + ":" + anchor.getName();
final Map<String, String> extensions = createUpdateEventExtensions(updateEventId);
final CloudEvent cpsDataUpdatedEventAsCloudEvent =
}
private CpsDataUpdatedEvent createCpsDataUpdatedEvent(final Anchor anchor, final OffsetDateTime observedTimestamp,
- final String xpath,
- final Operation rootNodeOperation) {
+ final String xpath, final String action) {
final CpsDataUpdatedEvent cpsDataUpdatedEvent = new CpsDataUpdatedEvent();
- final Data updateEventData = new Data();
+ final EventPayload updateEventData = new EventPayload();
updateEventData.setObservedTimestamp(DateTimeUtility.toString(observedTimestamp));
updateEventData.setDataspaceName(anchor.getDataspaceName());
updateEventData.setAnchorName(anchor.getName());
updateEventData.setSchemaSetName(anchor.getSchemaSetName());
- updateEventData.setOperation(getRootNodeOperation(xpath, rootNodeOperation));
+ updateEventData.setAction(fromValue(action));
updateEventData.setXpath(xpath);
- cpsDataUpdatedEvent.setData(updateEventData);
+ cpsDataUpdatedEvent.setEventPayload(updateEventData);
return cpsDataUpdatedEvent;
}
extensions.put("correlationid", eventKey);
return extensions;
}
-
- private Operation getRootNodeOperation(final String xpath, final Operation operation) {
- return isRootXpath(xpath) || isRootContainerNodeXpath(xpath) ? operation : Operation.UPDATE;
- }
-
- private static boolean isRootXpath(final String xpath) {
- return "/".equals(xpath) || "".equals(xpath);
- }
-
- private static boolean isRootContainerNodeXpath(final String xpath) {
- return 0 == xpath.lastIndexOf('/');
- }
}
import static org.onap.cps.cpspath.parser.CpsPathUtil.NO_PARENT_PATH;
import static org.onap.cps.cpspath.parser.CpsPathUtil.ROOT_NODE_XPATH;
+import static org.onap.cps.events.model.EventPayload.Action.CREATE;
+import static org.onap.cps.events.model.EventPayload.Action.REMOVE;
+import static org.onap.cps.events.model.EventPayload.Action.REPLACE;
import static org.onap.cps.utils.ContentType.JSON;
import io.micrometer.core.annotation.Timed;
import org.onap.cps.api.parameters.FetchDescendantsOption;
import org.onap.cps.cpspath.parser.CpsPathUtil;
import org.onap.cps.events.CpsDataUpdateEventsProducer;
-import org.onap.cps.events.model.Data.Operation;
import org.onap.cps.spi.CpsDataPersistenceService;
import org.onap.cps.utils.ContentType;
import org.onap.cps.utils.CpsValidator;
public class CpsDataServiceImpl implements CpsDataService {
private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
+ private static final String CREATE_ACTION = CREATE.value();
+ private static final String REMOVE_ACTION = REMOVE.value();
+ private static final String REPLACE_ACTION = REPLACE.value();
private final CpsDataPersistenceService cpsDataPersistenceService;
private final CpsDataUpdateEventsProducer cpsDataUpdateEventsProducer;
final Collection<DataNode> dataNodes = dataNodeFactory
.createDataNodesWithAnchorParentXpathAndNodeData(anchor, ROOT_NODE_XPATH, nodeData, contentType);
cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
- sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.CREATE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, CREATE_ACTION, observedTimestamp);
}
@Override
final Collection<DataNode> dataNodes = dataNodeFactory
.createDataNodesWithAnchorParentXpathAndNodeData(anchor, parentNodeXpath, nodeData, contentType);
cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
- sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.CREATE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, CREATE_ACTION, observedTimestamp);
}
@Override
cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
listElementDataNodeCollection);
}
- sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, observedTimestamp);
}
@Override
final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
.collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
- sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, observedTimestamp);
}
@Override
for (final DataNode dataNodeUpdate : dataNodeUpdates) {
processDataNodeUpdate(anchor, dataNodeUpdate);
}
- sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, observedTimestamp);
}
@Override
final Collection<DataNode> dataNodes = dataNodeFactory
.createDataNodesWithAnchorParentXpathAndNodeData(anchor, parentNodeXpath, nodeData, contentType);
cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
- sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, observedTimestamp);
}
@Override
.createDataNodesWithAnchorAndXpathToNodeData(anchor, nodeDataPerParentNodeXPath, contentType);
cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
nodeDataPerParentNodeXPath.keySet().forEach(nodeXpath ->
- sendDataUpdatedEvent(anchor, nodeXpath, Operation.UPDATE, observedTimestamp));
+ sendDataUpdatedEvent(anchor, nodeXpath, REPLACE_ACTION, observedTimestamp));
}
@Override
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
- sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, observedTimestamp);
}
@Override
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
- sendDataUpdatedEvent(anchor, dataNodeXpath, Operation.DELETE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, dataNodeXpath, REMOVE_ACTION, observedTimestamp);
}
@Override
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
dataNodeXpaths.forEach(dataNodeXpath ->
- sendDataUpdatedEvent(anchor, dataNodeXpath, Operation.DELETE, observedTimestamp));
+ sendDataUpdatedEvent(anchor, dataNodeXpath, REMOVE_ACTION, observedTimestamp));
}
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
- sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, REMOVE_ACTION, observedTimestamp);
}
@Override
cpsValidator.validateNameCharacters(anchorNames);
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
for (final Anchor anchor : cpsAnchorService.getAnchors(dataspaceName, anchorNames)) {
- sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, REMOVE_ACTION, observedTimestamp);
}
}
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
- sendDataUpdatedEvent(anchor, listNodeXpath, Operation.DELETE, observedTimestamp);
+ sendDataUpdatedEvent(anchor, listNodeXpath, REMOVE_ACTION, observedTimestamp);
}
@Override
private void sendDataUpdatedEvent(final Anchor anchor,
final String xpath,
- final Operation operation,
+ final String action,
final OffsetDateTime observedTimestamp) {
try {
- cpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp);
+ cpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, xpath, action, observedTimestamp);
} catch (final Exception exception) {
log.error("Failed to send message to notification service", exception);
}
import java.time.OffsetDateTime
-import static org.onap.cps.events.model.Data.Operation.CREATE
-import static org.onap.cps.events.model.Data.Operation.DELETE
-import static org.onap.cps.events.model.Data.Operation.UPDATE
+import static org.onap.cps.events.model.EventPayload.Action.CREATE
+import static org.onap.cps.events.model.EventPayload.Action.REMOVE
+import static org.onap.cps.events.model.EventPayload.Action.REPLACE
@ContextConfiguration(classes = [ObjectMapper, JsonObjectMapper])
class CpsDataUpdateEventsProducerSpec extends Specification {
+
+ static def CREATE_ACTION = CREATE.value()
+ static def REPLACE_ACTION = REPLACE.value()
+ static def REMOVE_ACTION = REMOVE.value()
+
def mockEventsProducer = Mock(EventsProducer)
def objectMapper = new ObjectMapper();
def mockCpsNotificationService = Mock(CpsNotificationService)
objectUnderTest.topicName = 'cps-core-event'
}
- def 'Create and send cps update event where events are #scenario.'() {
- given: 'an anchor, operation and observed timestamp'
+ def 'Create and send cps event with #scenario.'() {
+ given: 'an anchor'
def anchor = new Anchor('anchor01', 'dataspace01', 'schema01');
- def operation = operationInRequest
- def observedTimestamp = OffsetDateTime.now()
and: 'notificationsEnabled is #notificationsEnabled and it will be true as default'
objectUnderTest.notificationsEnabled = true
and: 'cpsChangeEventNotificationsEnabled is also true'
objectUnderTest.cpsChangeEventNotificationsEnabled = true
when: 'service is called to send data update event'
- objectUnderTest.sendCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp)
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, xpath, actionInRequest, OffsetDateTime.now())
then: 'the event contains the required attributes'
1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
args ->
assert cpsDataUpdatedEvent.getExtension('correlationid') == 'dataspace01:anchor01'
assert cpsDataUpdatedEvent.type == 'org.onap.cps.events.model.CpsDataUpdatedEvent'
assert cpsDataUpdatedEvent.source.toString() == 'CPS'
- def actualEventOperation = CloudEventUtils.mapData(cpsDataUpdatedEvent, PojoCloudEventDataMapper.from(objectMapper, CpsDataUpdatedEvent.class)).getValue().data.operation
- assert actualEventOperation == expectedOperation
+ def actualEventOperation = CloudEventUtils.mapData(cpsDataUpdatedEvent, PojoCloudEventDataMapper.from(objectMapper, CpsDataUpdatedEvent.class)).getValue().eventPayload.action.value()
+ assert actualEventOperation == expectedAction
}
}
where: 'the following values are used'
- scenario | xpath | operationInRequest || expectedOperation
- 'empty xpath' | '' | CREATE || CREATE
- 'root xpath and create operation' | '/' | CREATE || CREATE
- 'root xpath and update operation' | '/' | UPDATE || UPDATE
- 'root xpath and delete operation' | '/' | DELETE || DELETE
- 'not root xpath and update operation' | 'test' | UPDATE || UPDATE
- 'root node xpath and create operation' | '/test' | CREATE || CREATE
- 'non root node xpath and update operation' | '/test/path' | CREATE || UPDATE
- 'non root node xpath and delete operation' | '/test/path' | DELETE || UPDATE
+ scenario | xpath | actionInRequest || expectedAction
+ 'empty xpath' | '' | CREATE_ACTION || CREATE_ACTION
+ 'root xpath and create action' | '/' | CREATE_ACTION || CREATE_ACTION
+ 'root xpath and replace action' | '/' | REPLACE_ACTION || REPLACE_ACTION
+ 'root xpath and remove action' | '/' | REMOVE_ACTION || REMOVE_ACTION
+ 'not root xpath and replace action' | 'test' | REPLACE_ACTION || REPLACE_ACTION
+ 'root node xpath and create action' | '/test' | CREATE_ACTION || CREATE_ACTION
+ 'non root node xpath and replace action' | '/test/path' | CREATE_ACTION || CREATE_ACTION
+ 'non root node xpath and remove action' | '/test/path' | REMOVE_ACTION || REMOVE_ACTION
}
- def 'Send cps update event when no timestamp provided.'() {
- given: 'an anchor, operation and null timestamp'
+ def 'Send cps event when no timestamp provided.'() {
+ given: 'an anchor'
def anchor = new Anchor('anchor01', 'dataspace01', 'schema01');
- def observedTimestamp = null
and: 'notificationsEnabled is true'
objectUnderTest.notificationsEnabled = true
and: 'cpsChangeEventNotificationsEnabled is true'
objectUnderTest.cpsChangeEventNotificationsEnabled = true
- when: 'service is called to send data update event'
- objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE, observedTimestamp)
+ when: 'service is called to send data event'
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, null)
then: 'the event is sent'
1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
}
- def 'Enabling and disabling sending cps update events.'() {
- given: 'a different anchor'
+ def 'Enabling and disabling sending cps events.'() {
+ given: 'an anchor'
def anchor = new Anchor('anchor02', 'some dataspace', 'some schema');
and: 'notificationsEnabled is #notificationsEnabled'
objectUnderTest.notificationsEnabled = notificationsEnabled
objectUnderTest.cpsChangeEventNotificationsEnabled = cpsChangeEventNotificationsEnabled
and: 'notification service enabled is: #cpsNotificationServiceisNotificationEnabled'
mockCpsNotificationService.isNotificationEnabled(_, 'anchor02') >> cpsNotificationServiceisNotificationEnabled
- when: 'service is called to send data update event'
- objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE, null)
+ when: 'service is called to send data event'
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, null)
then: 'the event is only sent when all related flags are true'
expectedCallsToProducer * mockEventsProducer.sendCloudEvent(*_)
where: 'the following flags are used'
import java.time.OffsetDateTime
-import static org.onap.cps.events.model.Data.Operation.DELETE
-
class CpsDataServiceImplSpec extends Specification {
def mockCpsDataPersistenceService = Mock(CpsDataPersistenceService)
def mockCpsAnchorService = Mock(CpsAnchorService)
1 * mockCpsValidator.validateNameCharacters(['anchor1', 'anchor2'])
and: 'the persistence service method is invoked with the correct parameters'
1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, _ as Collection<String>)
- and: 'a data update event is sent for each anchor'
- 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor1, '/', DELETE, observedTimestamp)
- 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor2, '/', DELETE, observedTimestamp)
}
def "Validating #scenario when dry run is enabled."() {