data-updated:
change-event-notifications-enabled: ${CPS_CHANGE_EVENT_NOTIFICATIONS_ENABLED:false}
topic: ${CPS_CHANGE_EVENT_TOPIC:cps-data-updated-events}
+ delta-notification: ${CPS_DELTA_NOTIFICATION_ENABLED:false}
notification:
enabled: true
"$ref": "#/definitions/CpsDataUpdateEvent",
"definitions": {
"CpsDataUpdateEvent": {
- "description": "The payload for CPS data updated event.",
+ "description": "The payload for CPS data update event.",
"type": "object",
"javaType": "org.onap.cps.events.model.CpsDataUpdatedEvent",
"properties": {
"xpath": {
"description": "xpath of the modified content",
"type": "string"
+ },
+ "cloudEventData": {
+ "description": "Source and target data from delta report",
+ "type": "object",
+ "properties": {
+ "sourceData": {
+ "description": "Source data from delta report",
+ "type": "string",
+ "minLength": 1
+ },
+ "targetData": {
+ "description": "Target data from delta report",
+ "type": "string",
+ "minLength": 1
+ }
+ },
+ "required": ["source", "target"],
+ "additionalProperties": false
}
},
"required": [
import io.cloudevents.CloudEvent;
import io.micrometer.core.annotation.Timed;
import java.time.OffsetDateTime;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.CpsNotificationService;
import org.onap.cps.api.model.Anchor;
+import org.onap.cps.api.model.DeltaReport;
+import org.onap.cps.events.model.CloudEventData;
import org.onap.cps.events.model.CpsDataUpdatedEvent;
import org.onap.cps.events.model.EventPayload;
import org.onap.cps.utils.DateTimeUtility;
+import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
public class CpsDataUpdateEventsProducer {
private final EventProducer eventProducer;
-
+ private final JsonObjectMapper jsonObjectMapper;
private final CpsNotificationService cpsNotificationService;
@Value("${app.cps.data-updated.topic:cps-data-updated-events}")
* @param observedTimestamp timestamp when data was updated.
*/
@Timed(value = "cps.data.update.events.send", description = "Time taken to send Data Update event")
- public void sendCpsDataUpdateEvent(final Anchor anchor, final String xpath,
- final String action, final OffsetDateTime observedTimestamp) {
+ public void sendCpsDataUpdateEvent(final Anchor anchor, final String xpath, final String action,
+ final List<DeltaReport> deltaReports, final OffsetDateTime observedTimestamp) {
if (notificationsEnabled && cpsChangeEventNotificationsEnabled && isNotificationEnabledForAnchor(anchor)) {
- final CpsDataUpdatedEvent cpsDataUpdatedEvent =
- createCpsDataUpdatedEvent(anchor, observedTimestamp, xpath, action);
final String updateEventId = anchor.getDataspaceName() + ":" + anchor.getName();
- final Map<String, String> extensions = createUpdateEventExtensions(updateEventId);
- final CloudEvent cpsDataUpdatedEventAsCloudEvent =
- CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent)
- .extensions(extensions).build().asCloudEvent();
- eventProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
+ final Map<String, String> extensions = Map.of("correlationid", updateEventId);
+ if (!deltaReports.isEmpty()) {
+ final Collection<CpsDataUpdatedEvent> cpsDataUpdatedEvents =
+ createUpdatedEventsFromDeltaReport(anchor, observedTimestamp, deltaReports);
+ cpsDataUpdatedEvents.forEach(cpsDataUpdatedEvent ->
+ sendCpsDataUpdatedEvent(updateEventId, cpsDataUpdatedEvent, extensions));
+ } else {
+ final CpsDataUpdatedEvent cpsDataUpdatedEvent =
+ createCpsDataUpdatedEvent(anchor, observedTimestamp, xpath, action);
+ sendCpsDataUpdatedEvent(updateEventId, cpsDataUpdatedEvent, extensions);
+ }
} else {
log.debug("State of Overall Notifications : {} and Cps Change Event Notifications : {}",
- notificationsEnabled, cpsChangeEventNotificationsEnabled);
+ notificationsEnabled, cpsChangeEventNotificationsEnabled);
}
}
return cpsDataUpdatedEvent;
}
- private Map<String, String> createUpdateEventExtensions(final String eventKey) {
- final Map<String, String> extensions = new HashMap<>();
- extensions.put("correlationid", eventKey);
- return extensions;
+ private Collection<CpsDataUpdatedEvent> createUpdatedEventsFromDeltaReport(final Anchor anchor,
+ final OffsetDateTime observedTimestamp,
+ final List<DeltaReport> deltaReports) {
+ final Collection<CpsDataUpdatedEvent> cpsDataUpdatedEvents = new ArrayList<>();
+ for (final DeltaReport deltaReport : deltaReports) {
+ if (deltaReport.getSourceData() != null || deltaReport.getTargetData() != null) {
+ cpsDataUpdatedEvents.add(toCpsDataUpdatedEvent(anchor, observedTimestamp, deltaReport));
+ }
+ }
+ return cpsDataUpdatedEvents;
+ }
+
+ private void sendCpsDataUpdatedEvent(final String updateEventId,
+ final CpsDataUpdatedEvent cpsDataUpdatedEvent,
+ final Map<String, String> extensions) {
+ final CloudEvent cpsDataUpdatedEventAsCloudEvent =
+ CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent)
+ .extensions(extensions).build().asCloudEvent();
+ eventProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
+ }
+
+ private CpsDataUpdatedEvent toCpsDataUpdatedEvent(final Anchor anchor, final OffsetDateTime observedTimestamp,
+ final DeltaReport deltaReport) {
+ final CloudEventData cloudEventData = new CloudEventData();
+ cloudEventData.setSourceData(jsonObjectMapper.asJsonString(deltaReport.getSourceData()));
+ cloudEventData.setTargetData(jsonObjectMapper.asJsonString(deltaReport.getTargetData()));
+ final EventPayload updateEventData = new EventPayload();
+ updateEventData.setObservedTimestamp(DateTimeUtility.toString(observedTimestamp));
+ updateEventData.setDataspaceName(anchor.getDataspaceName());
+ updateEventData.setAnchorName(anchor.getName());
+ updateEventData.setSchemaSetName(anchor.getSchemaSetName());
+ updateEventData.setXpath(deltaReport.getXpath());
+ updateEventData.setAction(fromValue(deltaReport.getAction()));
+ updateEventData.setCloudEventData(cloudEventData);
+
+ final CpsDataUpdatedEvent cpsDataUpdatedEvent = new CpsDataUpdatedEvent();
+ cpsDataUpdatedEvent.setEventPayload(updateEventData);
+ return cpsDataUpdatedEvent;
}
}
package org.onap.cps.impl;
+import static org.onap.cps.api.parameters.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS;
import static org.onap.cps.cpspath.parser.CpsPathUtil.NO_PARENT_PATH;
import static org.onap.cps.cpspath.parser.CpsPathUtil.ROOT_NODE_XPATH;
import static org.onap.cps.cpspath.parser.CpsPathUtil.isPathToListElement;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.onap.cps.api.DataNodeFactory;
import org.onap.cps.api.model.Anchor;
import org.onap.cps.api.model.DataNode;
+import org.onap.cps.api.model.DeltaReport;
import org.onap.cps.api.parameters.FetchDescendantsOption;
import org.onap.cps.cpspath.parser.CpsPathUtil;
import org.onap.cps.events.CpsDataUpdateEventsProducer;
import org.onap.cps.utils.ContentType;
import org.onap.cps.utils.CpsValidator;
import org.onap.cps.utils.YangParser;
+import org.onap.cps.utils.deltareport.GroupedDeltaReportGenerator;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
private static final String CREATE_ACTION = CREATE.value();
private static final String REMOVE_ACTION = REMOVE.value();
private static final String REPLACE_ACTION = REPLACE.value();
+ private static final List<DeltaReport> NO_DELTA_REPORTS = Collections.emptyList();
private final CpsDataPersistenceService cpsDataPersistenceService;
private final CpsDataUpdateEventsProducer cpsDataUpdateEventsProducer;
private final DataNodeFactory dataNodeFactory;
private final CpsValidator cpsValidator;
private final YangParser yangParser;
+ private final GroupedDeltaReportGenerator groupedDeltaReportGenerator;
+
+ @Value("${app.cps.data-updated.delta-notification:false}")
+ private boolean deltaNotificationEnabled;
@Override
public void saveData(final String dataspaceName, final String anchorName, final String nodeData,
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = dataNodeFactory
.createDataNodesWithAnchorParentXpathAndNodeData(anchor, ROOT_NODE_XPATH, nodeData, contentType);
+ final List<DeltaReport> deltaReports =
+ generateDeltaReports(dataspaceName, anchorName, ROOT_NODE_XPATH, dataNodes);
cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
- sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, CREATE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, CREATE_ACTION, deltaReports, observedTimestamp);
}
@Override
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = dataNodeFactory
.createDataNodesWithAnchorParentXpathAndNodeData(anchor, parentNodeXpath, nodeData, contentType);
+ final List<DeltaReport> deltaReports =
+ generateDeltaReports(dataspaceName, anchorName, parentNodeXpath, dataNodes);
cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
- sendDataUpdatedEvent(anchor, parentNodeXpath, CREATE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, CREATE_ACTION, deltaReports, observedTimestamp);
}
@Override
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> listElementDataNodeCollection = dataNodeFactory
.createDataNodesWithAnchorParentXpathAndNodeData(anchor, parentNodeXpath, nodeData, contentType);
+ final List<DeltaReport> deltaReports =
+ generateDeltaReports(dataspaceName, anchorName, parentNodeXpath, listElementDataNodeCollection);
if (ROOT_NODE_XPATH.equals(parentNodeXpath)) {
cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, listElementDataNodeCollection);
} else {
cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
listElementDataNodeCollection);
}
- sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, deltaReports, observedTimestamp);
}
@Override
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodesInPatch = dataNodeFactory
.createDataNodesWithAnchorParentXpathAndNodeData(anchor, parentNodeXpath, nodeData, contentType);
+ final List<DeltaReport> deltaReports =
+ generateDeltaReports(dataspaceName, anchorName, parentNodeXpath, dataNodesInPatch);
final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
.collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
- sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, deltaReports, observedTimestamp);
}
@Override
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodeUpdates = dataNodeFactory
.createDataNodesWithAnchorParentXpathAndNodeData(anchor, parentNodeXpath, dataNodeUpdatesAsJson, JSON);
+ final List<DeltaReport> deltaReports =
+ generateDeltaReports(dataspaceName, anchorName, parentNodeXpath, dataNodeUpdates);
for (final DataNode dataNodeUpdate : dataNodeUpdates) {
processDataNodeUpdate(anchor, dataNodeUpdate);
}
- sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, deltaReports, observedTimestamp);
}
@Override
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
final Collection<DataNode> dataNodes = dataNodeFactory
.createDataNodesWithAnchorParentXpathAndNodeData(anchor, parentNodeXpath, nodeData, contentType);
+ final List<DeltaReport> deltaReports =
+ generateDeltaReports(dataspaceName, anchorName, parentNodeXpath, dataNodes);
if (ROOT_NODE_XPATH.equals(parentNodeXpath) || !isPathToListElement(parentNodeXpath)) {
cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
} else {
cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
}
- sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, deltaReports, observedTimestamp);
}
@Override
.createDataNodesWithAnchorAndXpathToNodeData(anchor, nodeDataPerParentNodeXPath, contentType);
cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
nodeDataPerParentNodeXPath.keySet().forEach(nodeXpath ->
- sendDataUpdatedEvent(anchor, nodeXpath, REPLACE_ACTION, observedTimestamp));
+ sendDataUpdatedEvent(anchor, nodeXpath, REPLACE_ACTION, NO_DELTA_REPORTS, observedTimestamp));
}
@Override
final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
+ final List<DeltaReport> deltaReports =
+ generateDeltaReports(dataspaceName, anchorName, parentNodeXpath, dataNodes);
cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
- sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, parentNodeXpath, REPLACE_ACTION, deltaReports, observedTimestamp);
}
@Override
public void deleteDataNode(final String dataspaceName, final String anchorName, final String dataNodeXpath,
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
+ final List<DeltaReport> deltaReports = generateDeltaReports(dataspaceName, anchorName, dataNodeXpath,
+ Collections.emptyList());
cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
- sendDataUpdatedEvent(anchor, dataNodeXpath, REMOVE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, dataNodeXpath, REMOVE_ACTION, deltaReports, observedTimestamp);
}
@Override
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
dataNodeXpaths.forEach(dataNodeXpath ->
- sendDataUpdatedEvent(anchor, dataNodeXpath, REMOVE_ACTION, observedTimestamp));
+ sendDataUpdatedEvent(anchor, dataNodeXpath, REMOVE_ACTION, NO_DELTA_REPORTS, observedTimestamp));
}
public void deleteDataNodes(final String dataspaceName, final String anchorName,
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
+ final List<DeltaReport> deltaReports = generateDeltaReports(dataspaceName, anchorName, ROOT_NODE_XPATH,
+ Collections.emptyList());
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
- sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, REMOVE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, REMOVE_ACTION, deltaReports, observedTimestamp);
}
@Override
description = "Time taken to delete all data nodes for multiple anchors")
public void deleteDataNodes(final String dataspaceName, final Collection<String> anchorNames,
final OffsetDateTime observedTimestamp) {
+ final boolean deltaNotification = false;
cpsValidator.validateNameCharacters(dataspaceName);
cpsValidator.validateNameCharacters(anchorNames);
cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
for (final Anchor anchor : cpsAnchorService.getAnchors(dataspaceName, anchorNames)) {
- sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, REMOVE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, REMOVE_ACTION, NO_DELTA_REPORTS, observedTimestamp);
}
}
public void deleteListOrListElement(final String dataspaceName, final String anchorName, final String listNodeXpath,
final OffsetDateTime observedTimestamp) {
cpsValidator.validateNameCharacters(dataspaceName, anchorName);
+ final List<DeltaReport> deltaReports = generateDeltaReports(dataspaceName, anchorName, listNodeXpath,
+ Collections.emptyList());
cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
- sendDataUpdatedEvent(anchor, listNodeXpath, REMOVE_ACTION, observedTimestamp);
+ sendDataUpdatedEvent(anchor, listNodeXpath, REMOVE_ACTION, deltaReports, observedTimestamp);
}
@Override
}
}
+ private List<DeltaReport> generateDeltaReports(final String dataspaceName, final String anchorName,
+ final String xpath, final Collection<DataNode> targetDataNodes) {
+ if (deltaNotificationEnabled) {
+ final Collection<DataNode> sourceDataNodes = getDataNodesForMultipleXpaths(dataspaceName, anchorName,
+ Collections.singletonList(xpath), INCLUDE_ALL_DESCENDANTS);
+ return groupedDeltaReportGenerator.createCondensedDeltaReports(sourceDataNodes, targetDataNodes);
+ }
+ return NO_DELTA_REPORTS;
+ }
+
private void sendDataUpdatedEvent(final Anchor anchor,
final String xpath,
final String action,
+ final List<DeltaReport> deltaReports,
final OffsetDateTime observedTimestamp) {
try {
- cpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, xpath, action, observedTimestamp);
+ cpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, xpath, action, deltaReports, observedTimestamp);
} catch (final Exception exception) {
log.error("Failed to send message to notification service", exception);
}
import org.onap.cps.api.CpsNotificationService
import org.onap.cps.api.model.Anchor
import org.onap.cps.events.model.CpsDataUpdatedEvent
+import org.onap.cps.impl.DeltaReportBuilder
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.test.context.ContextConfiguration
import spock.lang.Specification
def mockEventProducer = Mock(EventProducer)
def objectMapper = new ObjectMapper();
def mockCpsNotificationService = Mock(CpsNotificationService)
+ def jsonObjectMapper = new JsonObjectMapper(objectMapper)
- def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventProducer, mockCpsNotificationService)
+ def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventProducer, jsonObjectMapper, mockCpsNotificationService)
def setup() {
mockCpsNotificationService.isNotificationEnabled('dataspace01', 'anchor01') >> true
objectUnderTest.topicName = 'cps-core-event'
}
+ static def deltaReport = []
+
def 'Create and send cps event with #scenario.'() {
given: 'an anchor'
- def anchor = new Anchor('anchor01', 'dataspace01', 'schema01');
+ def anchor = new Anchor('anchor01', 'dataspace01', 'schema01')
and: 'notificationsEnabled is #notificationsEnabled and it will be true as default'
objectUnderTest.notificationsEnabled = true
and: 'cpsChangeEventNotificationsEnabled is also true'
objectUnderTest.cpsChangeEventNotificationsEnabled = true
when: 'service is called to send data update event'
- objectUnderTest.sendCpsDataUpdateEvent(anchor, xpath, actionInRequest, OffsetDateTime.now())
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, xpath, actionInRequest, deltaReport, OffsetDateTime.now())
then: 'the event contains the required attributes'
1 * mockEventProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
- args ->
- {
- def cpsDataUpdatedEvent = (args[2] as CloudEvent)
- assert cpsDataUpdatedEvent.getExtension('correlationid') == 'dataspace01:anchor01'
- assert cpsDataUpdatedEvent.type == 'org.onap.cps.events.model.CpsDataUpdatedEvent'
- assert cpsDataUpdatedEvent.source.toString() == 'CPS'
- def actualEventOperation = CloudEventUtils.mapData(cpsDataUpdatedEvent, PojoCloudEventDataMapper.from(objectMapper, CpsDataUpdatedEvent.class)).getValue().eventPayload.action.value()
- assert actualEventOperation == expectedAction
- }
+ args ->
+ {
+ def cpsDataUpdatedEvent = (args[2] as CloudEvent)
+ assert cpsDataUpdatedEvent.getExtension('correlationid') == 'dataspace01:anchor01'
+ assert cpsDataUpdatedEvent.type == 'org.onap.cps.events.model.CpsDataUpdatedEvent'
+ assert cpsDataUpdatedEvent.source.toString() == 'CPS'
+ def actualEventOperation = CloudEventUtils.mapData(cpsDataUpdatedEvent, PojoCloudEventDataMapper.from(objectMapper, CpsDataUpdatedEvent.class)).getValue().eventPayload.action.value()
+ assert actualEventOperation == expectedAction
+ }
}
where: 'the following values are used'
scenario | xpath | actionInRequest || expectedAction
def 'Send cps event when no timestamp provided.'() {
given: 'an anchor'
- def anchor = new Anchor('anchor01', 'dataspace01', 'schema01');
+ def anchor = new Anchor('anchor01', 'dataspace01', 'schema01')
and: 'notificationsEnabled is true'
objectUnderTest.notificationsEnabled = true
and: 'cpsChangeEventNotificationsEnabled is true'
objectUnderTest.cpsChangeEventNotificationsEnabled = true
when: 'service is called to send data event'
- objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, null)
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, deltaReport, null)
then: 'the event is sent'
1 * mockEventProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
}
def 'Enabling and disabling sending cps events.'() {
given: 'an anchor'
- def anchor = new Anchor('anchor02', 'some dataspace', 'some schema');
+ def anchor = new Anchor('anchor02', 'some dataspace', 'some schema')
and: 'notificationsEnabled is #notificationsEnabled'
objectUnderTest.notificationsEnabled = notificationsEnabled
and: 'cpsChangeEventNotificationsEnabled is #cpsChangeEventNotificationsEnabled'
and: 'notification service enabled is: #cpsNotificationServiceisNotificationEnabled'
mockCpsNotificationService.isNotificationEnabled(_, 'anchor02') >> cpsNotificationServiceisNotificationEnabled
when: 'service is called to send data event'
- objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, null)
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, deltaReport, OffsetDateTime.now())
then: 'the event is only sent when all related flags are true'
expectedCallsToProducer * mockEventProducer.sendCloudEvent(*_)
where: 'the following flags are used'
- notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled || expectedCallsToProducer
- false | true | true || 0
- true | false | true || 0
- true | true | false || 0
- true | true | true || 1
+ notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled || expectedCallsToProducer
+ false | true | true || 0
+ true | false | true || 0
+ true | true | false || 0
+ true | true | true || 1
+ }
+
+ def 'Sending CPS event with delta report'() {
+ given: 'an anchor'
+ def anchor = new Anchor('anchor01', 'dataspace01', 'schema01')
+ and: 'general notifications and cps change event notifications are enabled'
+ objectUnderTest.notificationsEnabled = true
+ objectUnderTest.cpsChangeEventNotificationsEnabled = true
+ when: 'service is called to send data event'
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', REPLACE.value(), deltaReports, OffsetDateTime.now())
+ then: 'the cloud event producer is invoked and an event is sent for each entry in delta report'
+ deltaReports.forEach { deltaReport ->
+ expectedInvocationCount * mockEventProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
+ args ->
+ {
+ def cpsDataUpdatedEvent = (args[2] as CloudEvent)
+ def eventPayload = CloudEventUtils.mapData(cpsDataUpdatedEvent, PojoCloudEventDataMapper.from(objectMapper, CpsDataUpdatedEvent.class)).getValue().eventPayload
+ assert eventPayload.action.value() == deltaReport.getAction()
+ assert eventPayload.xpath == deltaReport.xpath
+ assert deltaReport.action == expectedEventAction
+ }
+ }
+ }
+ where: 'the following values are used'
+ scenario | deltaReports || expectedInvocationCount | expectedEventAction
+ 'delta report with source data' | [new DeltaReportBuilder().actionRemove().withXpath('/bookstore').withSourceData(['categories': ['code': '4', 'name': 'Computing']]).build()] || 1 | REMOVE_ACTION
+ 'delta report with target data' | [new DeltaReportBuilder().actionCreate().withXpath('/bookstore').withTargetData(['categories': ['code': '4', 'name': 'Computing']]).build()] || 1 | CREATE_ACTION
+ 'delta report with no source or target data' | [new DeltaReportBuilder().build()] || 0 | null
+ 'delta report with source and target data' | [new DeltaReportBuilder().actionReplace().withXpath('/bookstore').withSourceData(['categories': ['code': '4', 'name': 'Computing']]).withTargetData(['categories': [['code': '4', 'name': 'Funny']]]).build()] || 1 | REPLACE_ACTION
}
-}
+ def 'Sending legacy CPS event when delta report is empty'() {
+ given: 'an anchor and empty delta reports'
+ def anchor = new Anchor('anchor01', 'dataspace01', 'schema01')
+ def emptyDeltaReports = []
+ and: 'general notifications and cps change event notifications are enabled'
+ objectUnderTest.notificationsEnabled = true
+ objectUnderTest.cpsChangeEventNotificationsEnabled = true
+ when: 'attempt to send data update event'
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, emptyDeltaReports, OffsetDateTime.now())
+ then: 'the cloud event producer is invoked once for legacy event'
+ 1 * mockEventProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
+ args ->
+ {
+ def cpsDataUpdatedEvent = (args[2] as CloudEvent)
+ def eventPayload = CloudEventUtils.mapData(cpsDataUpdatedEvent, PojoCloudEventDataMapper.from(objectMapper, CpsDataUpdatedEvent.class)).getValue().eventPayload
+ assert eventPayload.action.value() == CREATE_ACTION
+ assert eventPayload.xpath == '/'
+ }
+ }
+ }
+}
\ No newline at end of file
import org.onap.cps.utils.CpsValidator
import org.onap.cps.utils.YangParser
import org.onap.cps.utils.YangParserHelper
+import org.onap.cps.utils.deltareport.GroupedDeltaReportGenerator
import org.onap.cps.yang.TimedYangTextSchemaSourceSetBuilder
import org.onap.cps.yang.YangTextSchemaSourceSet
import org.onap.cps.yang.YangTextSchemaSourceSetBuilder
def yangParser = new YangParser(new YangParserHelper(), mockYangTextSchemaSourceSetCache, mockTimedYangTextSchemaSourceSetBuilder)
def mockCpsDataUpdateEventsProducer = Mock(CpsDataUpdateEventsProducer)
def dataNodeFactory = new DataNodeFactoryImpl(yangParser)
+ def mockGroupedDeltaReportGenerator = Mock(GroupedDeltaReportGenerator)
def objectUnderTest = new CpsDataServiceImpl(mockCpsDataPersistenceService, mockCpsDataUpdateEventsProducer, mockCpsAnchorService,
- dataNodeFactory, mockCpsValidator, yangParser)
+ dataNodeFactory, mockCpsValidator, yangParser, mockGroupedDeltaReportGenerator)
def logger = (Logger) LoggerFactory.getLogger(objectUnderTest.class)
def loggingListAppender
given: 'schema set for given anchor and dataspace references test-tree model'
setupSchemaSetMocks('test-tree.yang')
when: 'producer throws an exception while sending event'
- mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(_, _, _, _) >> { throw new Exception("Sending failed")}
+ mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(*_) >> { throw new Exception("Sending failed")}
and: 'an update event is performed'
objectUnderTest.updateNodeLeaves(dataspaceName, anchorName, '/', '{"test-tree": {"branch": []}}', observedTimestamp, ContentType.JSON)
then: 'the exception is not bubbled up'
assert logs.contains('Failed to send message to notification service')
}
+ def 'Save data nodes with delta notifications enabled'() {
+ given: 'schema set for given anchor and dataspace references test-tree model, json data and a delta report'
+ setupSchemaSetMocks('test-tree.yang')
+ def jsonData = '{"test-tree": {"branch": []}}'
+ def deltaReports = [new DeltaReportBuilder().withXpath('/').actionCreate().withTargetData(['name': 'Right']).build()]
+ and: 'delta report in notifications is enabled'
+ def deltaNotificationEnabled = (objectUnderTest.deltaNotificationEnabled = true)
+ when: 'save data method is invoked with json data'
+ objectUnderTest.saveData(dataspaceName, anchorName, jsonData, observedTimestamp)
+ then: 'the delta report generator returns delta reports'
+ mockGroupedDeltaReportGenerator.createCondensedDeltaReports(*_) >> deltaReports
+ and: 'the persistence service method is invoked with correct parameters'
+ 1 * mockCpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, _)
+ and: 'the event producer is invoked with correct parameters including the correct delta reports'
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, '/', 'create', { deltaReport ->
+ assert deltaReport.size() == 1
+ assert deltaReport[0].xpath == '/'
+ assert deltaReport[0].targetData == ['name': 'Right']
+ }, observedTimestamp)
+ }
+
+ def 'Save data nodes with xpath and delta report in notifications enabled'() {
+ given: 'schema set for given anchor and dataspace references test-tree model, json data and a delta report'
+ setupSchemaSetMocks('test-tree.yang')
+ def jsonData = '{"branch": [ { "name" : "Left" }]}'
+ def deltaReports = [new DeltaReportBuilder().withXpath('/test-tree').actionCreate().withTargetData(['name': 'Right']).build()]
+ and: 'delta report in notifications is enabled'
+ def deltaNotificationEnabled = (objectUnderTest.deltaNotificationEnabled = true)
+ when: 'save data method is invoked with json data and xpath'
+ objectUnderTest.saveData(dataspaceName, anchorName, '/test-tree', jsonData, observedTimestamp)
+ then: 'the delta report generator returns delta reports'
+ mockGroupedDeltaReportGenerator.createCondensedDeltaReports(*_) >> deltaReports
+ and: 'the persistence service method is invoked with correct parameters'
+ 1 * mockCpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, '/test-tree', _)
+ and: 'the event producer is invoked with correct parameters including the correct delta reports'
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, '/test-tree', 'create', { deltaReport ->
+ assert deltaReport.size() == 1
+ assert deltaReport[0].xpath == '/test-tree'
+ assert deltaReport[0].targetData == ['name': 'Right']
+ }, observedTimestamp)
+ }
+
+ def 'Save list data node with delta report in notifications enabled'() {
+ given: 'schema set for given anchor and dataspace references test-tree model, json data and a delta report'
+ setupSchemaSetMocks('test-tree.yang')
+ def jsonData = '{"branch": [ { "name" : "Left" }]}'
+ def deltaReports = [new DeltaReportBuilder().withXpath('/test-tree/branch[@name=\'Left\']').actionCreate().withTargetData(['name': 'Left']).build()]
+ and: 'delta report in notifications is enabled'
+ def deltaNotificationEnabled = (objectUnderTest.deltaNotificationEnabled = true)
+ when: 'save list data method is invoked with json data'
+ objectUnderTest.saveListElements(dataspaceName, anchorName, '/test-tree', jsonData, observedTimestamp, ContentType.JSON)
+ then: 'the delta report generator returns delta reports'
+ mockGroupedDeltaReportGenerator.createCondensedDeltaReports(*_) >> deltaReports
+ and: 'the persistence service method is invoked with correct parameters'
+ 1 * mockCpsDataPersistenceService.addListElements(dataspaceName, anchorName, '/test-tree', _)
+ and: 'the event producer is invoked with correct parameters including the correct delta reports'
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, '/test-tree', 'replace', { deltaReport ->
+ assert deltaReport.size() == 1
+ assert deltaReport[0].xpath == '/test-tree/branch[@name=\'Left\']'
+ assert deltaReport[0].targetData == ['name': 'Left']
+ }, observedTimestamp)
+ }
+
+ def 'Update data node leaves and descendants with delta report in notifications enabled'() {
+ given: 'schema set for given anchor and dataspace references test-tree model, json data and a delta report'
+ setupSchemaSetMocks('test-tree.yang')
+ def jsonData = '{"branch": [ { "name" : "Left" }]}'
+ def deltaReports = [new DeltaReportBuilder().withXpath('/test-tree/branch[@name=\'Left\']').actionReplace().withTargetData(['name': 'Left']).build()]
+ and: 'delta report in notifications is enabled'
+ def deltaNotificationEnabled = (objectUnderTest.deltaNotificationEnabled = true)
+ when: 'update data method is invoked with json data'
+ objectUnderTest.updateNodeLeavesAndExistingDescendantLeaves(dataspaceName, anchorName, '/test-tree', jsonData, observedTimestamp)
+ then: 'the delta report generator returns delta reports'
+ mockGroupedDeltaReportGenerator.createCondensedDeltaReports(*_) >> deltaReports
+ and: 'the persistence service method is invoked with correct parameters'
+ 1 * mockCpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, _)
+ and: 'the event producer is invoked with correct parameters including the correct delta reports'
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, '/test-tree', 'replace', { deltaReport ->
+ assert deltaReport.size() == 1
+ assert deltaReport[0].xpath == '/test-tree/branch[@name=\'Left\']'
+ assert deltaReport[0].targetData == ['name': 'Left']
+ }, observedTimestamp)
+ }
+
+ def 'Update node leaves with delta report in notifications enabled'() {
+ given: 'schema set for given anchor and dataspace references test-tree model, json data and a delta report'
+ setupSchemaSetMocks('test-tree.yang')
+ def jsonData = '{"branch": [ { "name" : "Left" }]}'
+ def deltaReports = [new DeltaReportBuilder().withXpath('/test-tree/branch[@name=\'Left\']').actionReplace().withTargetData(['name': 'Left']).build()]
+ and: 'delta report in notifications is enabled'
+ def deltaNotificationEnabled = (objectUnderTest.deltaNotificationEnabled = true)
+ when: 'update data method is invoked with json data'
+ objectUnderTest.updateNodeLeaves(dataspaceName, anchorName, '/test-tree', jsonData, observedTimestamp, ContentType.JSON)
+ then: 'the delta report generator returns delta reports'
+ mockGroupedDeltaReportGenerator.createCondensedDeltaReports(*_) >> deltaReports
+ and: 'the persistence service method is invoked with correct parameters'
+ 1 * mockCpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, _)
+ and: 'the event producer is invoked with correct parameters including the correct delta reports'
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, '/test-tree', 'replace', { deltaReport ->
+ assert deltaReport.size() == 1
+ assert deltaReport[0].xpath == '/test-tree/branch[@name=\'Left\']'
+ assert deltaReport[0].targetData == ['name': 'Left']
+ }, observedTimestamp)
+ }
+
+ def 'Update data nodes and descendants with delta report in notifications enabled'() {
+ given: 'schema set for given anchor and dataspace references test-tree model, json data and a delta report'
+ setupSchemaSetMocks('test-tree.yang')
+ def jsonData = '{"branch": [ { "name" : "Left" }]}'
+ def deltaReports = [new DeltaReportBuilder().withXpath('/test-tree/branch[@name=\'Left\']').actionReplace().withTargetData(['name': 'Left']).build()]
+ and: 'delta report in notifications is enabled'
+ def deltaNotificationEnabled = (objectUnderTest.deltaNotificationEnabled = true)
+ when: 'update data method is invoked with json data'
+ objectUnderTest.updateDataNodeAndDescendants(dataspaceName, anchorName, '/test-tree', jsonData, observedTimestamp, ContentType.JSON)
+ then: 'the delta report generator returns delta reports'
+ mockGroupedDeltaReportGenerator.createCondensedDeltaReports(*_) >> deltaReports
+ and: 'the persistence service method is invoked with correct parameters'
+ 1 * mockCpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, _)
+ and: 'the event producer is invoked with correct parameters including the correct delta reports'
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, '/test-tree', 'replace', { deltaReport ->
+ assert deltaReport.size() == 1
+ assert deltaReport[0].xpath == '/test-tree/branch[@name=\'Left\']'
+ assert deltaReport[0].targetData == ['name': 'Left']
+ }, observedTimestamp)
+ }
+
+ def 'Replace list content with delta report in notifications enabled'() {
+ given: 'schema set for given anchor and dataspace references test-tree model, json data and a delta report'
+ setupSchemaSetMocks('test-tree.yang')
+ def jsonData = '{"branch": [ { "name" : "Left" }]}'
+ def deltaReports = [new DeltaReportBuilder().withXpath('/test-tree/branch[@name=\'Left\']').actionReplace().withTargetData(['name': 'Left']).build()]
+ and: 'delta report in notifications is enabled'
+ def deltaNotificationEnabled = (objectUnderTest.deltaNotificationEnabled = true)
+ when: 'replace list content method is invoked with json data'
+ objectUnderTest.replaceListContent(dataspaceName, anchorName, '/test-tree', jsonData, observedTimestamp, ContentType.JSON)
+ then: 'the delta report generator returns delta reports'
+ mockGroupedDeltaReportGenerator.createCondensedDeltaReports(*_) >> deltaReports
+ and: 'the persistence service method is invoked with correct parameters'
+ 1 * mockCpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, '/test-tree', _)
+ and: 'the event producer is invoked with correct parameters including the correct delta reports'
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, '/test-tree', 'replace', { deltaReport ->
+ assert deltaReport.size() == 1
+ assert deltaReport[0].xpath == '/test-tree/branch[@name=\'Left\']'
+ assert deltaReport[0].targetData == ['name': 'Left']
+ }, observedTimestamp)
+ }
+
+ def 'Delete data node with delta report in notifications enabled'() {
+ given: 'schema set for given anchor and dataspace references test-tree model and a delta report'
+ setupSchemaSetMocks('test-tree.yang')
+ def deltaReports = [new DeltaReportBuilder().withXpath('/data-node').actionRemove().build()]
+ and: 'delta report in notifications is enabled'
+ def deltaNotificationEnabled = (objectUnderTest.deltaNotificationEnabled = true)
+ when: 'delete data node method is invoked with correct parameters'
+ objectUnderTest.deleteDataNode(dataspaceName, anchorName, '/data-node', observedTimestamp)
+ then: 'the delta report generator returns delta reports'
+ mockGroupedDeltaReportGenerator.createCondensedDeltaReports(*_) >> deltaReports
+ and: 'the persistence service method is invoked with the correct parameters'
+ 1 * mockCpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, '/data-node')
+ and: 'the event producer is invoked with correct parameters including the correct delta reports'
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, '/data-node', 'remove', { deltaReport ->
+ assert deltaReport.size() == 1
+ assert deltaReport[0].xpath == '/data-node'
+ }, observedTimestamp)
+ }
+
+ def 'Delete list element with delta report in notifications enabled'() {
+ given: 'schema set for given anchor and dataspace references test-tree model and a delta report'
+ setupSchemaSetMocks('test-tree.yang')
+ def deltaReports = [new DeltaReportBuilder().withXpath('/test-tree/branch[@name="A"]').actionRemove().build()]
+ and: 'delta report in notifications is enabled'
+ def deltaNotificationEnabled = (objectUnderTest.deltaNotificationEnabled = true)
+ when: 'delete list data method is invoked with correct parameters'
+ objectUnderTest.deleteListOrListElement(dataspaceName, anchorName, '/test-tree/branch[@name="A"]', observedTimestamp)
+ then: 'the delta report generator returns delta reports'
+ mockGroupedDeltaReportGenerator.createCondensedDeltaReports(*_) >> deltaReports
+ and: 'the persistence service method is invoked with the correct parameters'
+ 1 * mockCpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, '/test-tree/branch[@name="A"]')
+ and: 'the event producer is invoked with correct parameters including the correct delta reports'
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, '/test-tree/branch[@name="A"]', 'remove', { deltaReport ->
+ assert deltaReport.size() == 1
+ assert deltaReport[0].xpath == '/test-tree/branch[@name="A"]'
+ }, observedTimestamp)
+ }
+
+ def 'Delete data nodes with delta report in notifications enabled'() {
+ given: 'schema set for given anchor and dataspace references test-tree model and delta reports'
+ setupSchemaSetMocks('test-tree.yang')
+ def deltaReports = [
+ new DeltaReportBuilder().withXpath('/test-tree/branch[@name=\'A\']').actionRemove().build(),
+ new DeltaReportBuilder().withXpath('/test-tree/branch[@name=\'B\']').actionRemove().build()
+ ]
+ and: 'delta report in notifications is enabled'
+ def deltaNotificationEnabled = (objectUnderTest.deltaNotificationEnabled = true)
+ when: 'delete data nodes method is invoked with correct parameters'
+ objectUnderTest.deleteDataNodes(dataspaceName, anchorName, observedTimestamp)
+ then: 'the delta report generator returns delta reports'
+ mockGroupedDeltaReportGenerator.createCondensedDeltaReports(*_) >> deltaReports
+ and: 'the persistence service method is invoked with the correct parameters'
+ 1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName)
+ and: 'the event producer is invoked with correct parameters including the correct delta reports'
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, '/', 'remove', { deltaReport ->
+ assert deltaReport.size() == 2
+ assert deltaReport.collect { it.xpath }.containsAll(['/test-tree/branch[@name=\'A\']', '/test-tree/branch[@name=\'B\']'])
+ }, observedTimestamp)
+ }
+
def setupSchemaSetMocks(String... yangResources) {
def mockYangTextSchemaSourceSet = Mock(YangTextSchemaSourceSet)
mockYangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName) >> mockYangTextSchemaSourceSet
import org.onap.cps.utils.CpsValidator
import org.onap.cps.utils.YangParser
import org.onap.cps.utils.YangParserHelper
+import org.onap.cps.utils.deltareport.GroupedDeltaReportGenerator
import org.onap.cps.yang.TimedYangTextSchemaSourceSetBuilder
import org.onap.cps.yang.YangTextSchemaSourceSetBuilder
import spock.lang.Specification
def cpsModuleServiceImpl = new CpsModuleServiceImpl(mockCpsModulePersistenceService, mockYangTextSchemaSourceSetCache, mockCpsAnchorService, mockCpsValidator,timedYangTextSchemaSourceSetBuilder,yangParser)
def mockCpsDataUpdateEventsProducer = Mock(CpsDataUpdateEventsProducer)
def dataNodeFactory = new DataNodeFactoryImpl(yangParser)
- def cpsDataServiceImpl = new CpsDataServiceImpl(mockCpsDataPersistenceService, mockCpsDataUpdateEventsProducer, mockCpsAnchorService, dataNodeFactory, mockCpsValidator, yangParser)
+ def mockGroupedDeltaReportGenerator = Mock(GroupedDeltaReportGenerator)
+ def cpsDataServiceImpl = new CpsDataServiceImpl(mockCpsDataPersistenceService, mockCpsDataUpdateEventsProducer, mockCpsAnchorService, dataNodeFactory, mockCpsValidator, yangParser, mockGroupedDeltaReportGenerator)
def dataspaceName = 'someDataspace'
def anchorName = 'someAnchor'
def schemaSetName = 'someSchemaSet'
schema:
type: object
description: OK
+ "204":
+ content:
+ application/json:
+ schema:
+ example: my-resource
+ type: string
+ description: Created
"400":
content:
application/json: