Merge "Add Notification support in cps core"
authorPriyank Maheshwari <priyank.maheshwari@est.tech>
Thu, 2 May 2024 08:58:44 +0000 (08:58 +0000)
committerGerrit Code Review <gerrit@onap.org>
Thu, 2 May 2024 08:58:44 +0000 (08:58 +0000)
cps-application/src/main/resources/application.yml
cps-events/src/main/resources/schemas/updatenode/cps-data-updated-event-schema-1.0.0.json
cps-service/pom.xml
cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java [new file with mode: 0644]
cps-service/src/main/java/org/onap/cps/events/CpsEvent.java [new file with mode: 0644]
cps-service/src/test/groovy/org/onap/cps/api/impl/CpsDataServiceImplSpec.groovy
cps-service/src/test/groovy/org/onap/cps/api/impl/E2ENetworkSliceSpec.groovy
cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsServiceSpec.groovy [new file with mode: 0644]

index 7873cc4..68dd31b 100644 (file)
@@ -2,6 +2,7 @@
 #  Copyright (C) 2021 Pantheon.tech
 #  Modifications Copyright (C) 2021-2022 Bell Canada
 #  Modifications Copyright (C) 2021-2024 Nordix Foundation
+#  Modifications Copyright (C) 2024 TechMahindra Ltd
 #  ================================================================================
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -113,7 +114,9 @@ app:
             topic: ${DMI_CM_EVENTS_TOPIC:dmi-cm-events}
         device-heartbeat:
             topic: ${DMI_DEVICE_HEARTBEAT_TOPIC:dmi-device-heartbeat}
-
+    cps:
+        data-updated:
+            topic: ${CPS_CHANGE_EVENT_TOPIC:cps-data-updated-events}
 
 notification:
     enabled: true
@@ -138,8 +141,6 @@ springdoc:
             - name: cps-ncmp-inventory
               url: /api-docs/cps-ncmp/openapi-inventory.yaml
 
-
-
 security:
     # comma-separated uri patterns which do not require authorization
     permit-uri: /actuator/**,/swagger-ui.html,/swagger-ui/**,/swagger-resources/**,/api-docs/**,/v3/api-docs/**
index 18f83cc..a3eaf63 100644 (file)
@@ -12,7 +12,7 @@
           "type": "object",
           "properties": {
             "observedTimestamp": {
-              "description": "The timestamp when the data has been observed. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'. Ex: '2020-12-01T00:00:00.000+0000'  ",
+              "description": "The timestamp when the data has been observed. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'. Ex: '2024-02-12T09:35:46.143+0530'  ",
               "type": "string"
             },
             "dataspaceName": {
index cd7e209..647b2e4 100644 (file)
   <artifactId>cps-service</artifactId>
 
   <dependencies>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-messaging</artifactId>
       <groupId>io.micrometer</groupId>
       <artifactId>micrometer-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.cloudevents</groupId>
+      <artifactId>cloudevents-json-jackson</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.cloudevents</groupId>
+      <artifactId>cloudevents-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.cloudevents</groupId>
+      <artifactId>cloudevents-spring</artifactId>
+    </dependency>
     <dependency>
       <groupId>jakarta.validation</groupId>
       <artifactId>jakarta.validation-api</artifactId>
index b3f4227..f556f40 100644 (file)
@@ -3,7 +3,7 @@
  *  Copyright (C) 2021-2024 Nordix Foundation
  *  Modifications Copyright (C) 2020-2022 Bell Canada.
  *  Modifications Copyright (C) 2021 Pantheon.tech
- *  Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
+ *  Modifications Copyright (C) 2022-2024 TechMahindra Ltd.
  *  Modifications Copyright (C) 2022 Deutsche Telekom AG
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -39,6 +39,8 @@ import org.onap.cps.api.CpsAnchorService;
 import org.onap.cps.api.CpsDataService;
 import org.onap.cps.api.CpsDeltaService;
 import org.onap.cps.cpspath.parser.CpsPathUtil;
+import org.onap.cps.events.CpsDataUpdateEventsService;
+import org.onap.cps.events.model.Data.Operation;
 import org.onap.cps.spi.CpsDataPersistenceService;
 import org.onap.cps.spi.FetchDescendantsOption;
 import org.onap.cps.spi.exceptions.DataValidationException;
@@ -61,7 +63,9 @@ public class CpsDataServiceImpl implements CpsDataService {
     private static final long DEFAULT_LOCK_TIMEOUT_IN_MILLISECONDS = 300L;
 
     private final CpsDataPersistenceService cpsDataPersistenceService;
+    private final CpsDataUpdateEventsService cpsDataUpdateEventsService;
     private final CpsAnchorService cpsAnchorService;
+
     private final CpsValidator cpsValidator;
     private final YangParser yangParser;
     private final CpsDeltaService cpsDeltaService;
@@ -81,6 +85,7 @@ public class CpsDataServiceImpl implements CpsDataService {
         final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
         final Collection<DataNode> dataNodes = buildDataNodes(anchor, ROOT_NODE_XPATH, nodeData, contentType);
         cpsDataPersistenceService.storeDataNodes(dataspaceName, anchorName, dataNodes);
+        sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.CREATE, observedTimestamp);
     }
 
     @Override
@@ -99,6 +104,7 @@ public class CpsDataServiceImpl implements CpsDataService {
         final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
         final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, nodeData, contentType);
         cpsDataPersistenceService.addChildDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
+        sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.CREATE, observedTimestamp);
     }
 
     @Override
@@ -116,6 +122,7 @@ public class CpsDataServiceImpl implements CpsDataService {
             cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
                                                       listElementDataNodeCollection);
         }
+        sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
     }
 
     @Override
@@ -151,6 +158,7 @@ public class CpsDataServiceImpl implements CpsDataService {
         final Map<String, Map<String, Serializable>> xpathToUpdatedLeaves = dataNodesInPatch.stream()
                 .collect(Collectors.toMap(DataNode::getXpath, DataNode::getLeaves));
         cpsDataPersistenceService.batchUpdateDataLeaves(dataspaceName, anchorName, xpathToUpdatedLeaves);
+        sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
     }
 
     @Override
@@ -167,6 +175,7 @@ public class CpsDataServiceImpl implements CpsDataService {
         for (final DataNode dataNodeUpdate : dataNodeUpdates) {
             processDataNodeUpdate(anchor, dataNodeUpdate);
         }
+        sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
     }
 
     @Override
@@ -216,6 +225,7 @@ public class CpsDataServiceImpl implements CpsDataService {
         final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
         final Collection<DataNode> dataNodes = buildDataNodes(anchor, parentNodeXpath, jsonData, ContentType.JSON);
         cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
+        sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
     }
 
     @Override
@@ -228,6 +238,8 @@ public class CpsDataServiceImpl implements CpsDataService {
         final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
         final Collection<DataNode> dataNodes = buildDataNodes(anchor, nodesJsonData);
         cpsDataPersistenceService.updateDataNodesAndDescendants(dataspaceName, anchorName, dataNodes);
+        nodesJsonData.keySet().forEach(nodeXpath ->
+                sendDataUpdatedEvent(anchor, nodeXpath, Operation.UPDATE, observedTimestamp));
     }
 
     @Override
@@ -248,7 +260,9 @@ public class CpsDataServiceImpl implements CpsDataService {
     public void replaceListContent(final String dataspaceName, final String anchorName, final String parentNodeXpath,
             final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
+        final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
         cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
+        sendDataUpdatedEvent(anchor, parentNodeXpath, Operation.UPDATE, observedTimestamp);
     }
 
     @Override
@@ -258,6 +272,8 @@ public class CpsDataServiceImpl implements CpsDataService {
                                final OffsetDateTime observedTimestamp) {
         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
         cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
+        final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
+        sendDataUpdatedEvent(anchor, dataNodeXpath, Operation.DELETE, observedTimestamp);
     }
 
     @Override
@@ -267,8 +283,12 @@ public class CpsDataServiceImpl implements CpsDataService {
                                 final Collection<String> dataNodeXpaths, final OffsetDateTime observedTimestamp) {
         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName, dataNodeXpaths);
+        final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
+        dataNodeXpaths.forEach(dataNodeXpath ->
+                sendDataUpdatedEvent(anchor, dataNodeXpath, Operation.DELETE, observedTimestamp));
     }
 
+
     @Override
     @Timed(value = "cps.data.service.datanode.delete.anchor",
         description = "Time taken to delete all datanodes for an anchor")
@@ -276,6 +296,8 @@ public class CpsDataServiceImpl implements CpsDataService {
                                 final OffsetDateTime observedTimestamp) {
         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
+        final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
+        sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp);
     }
 
     @Override
@@ -286,6 +308,9 @@ public class CpsDataServiceImpl implements CpsDataService {
         cpsValidator.validateNameCharacters(dataspaceName);
         cpsValidator.validateNameCharacters(anchorNames);
         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorNames);
+        for (final Anchor anchor : cpsAnchorService.getAnchors(dataspaceName, anchorNames)) {
+            sendDataUpdatedEvent(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp);
+        }
     }
 
     @Override
@@ -295,6 +320,8 @@ public class CpsDataServiceImpl implements CpsDataService {
         final OffsetDateTime observedTimestamp) {
         cpsValidator.validateNameCharacters(dataspaceName, anchorName);
         cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
+        final Anchor anchor = cpsAnchorService.getAnchor(dataspaceName, anchorName);
+        sendDataUpdatedEvent(anchor, listNodeXpath, Operation.DELETE, observedTimestamp);
     }
 
     private Collection<DataNode> buildDataNodes(final Anchor anchor, final Map<String, String> nodesJsonData) {
@@ -345,4 +372,12 @@ public class CpsDataServiceImpl implements CpsDataService {
         }
     }
 
+    private void sendDataUpdatedEvent(final Anchor anchor, final String xpath,
+                                      final Operation operation, final OffsetDateTime observedTimestamp) {
+        try {
+            cpsDataUpdateEventsService.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp);
+        } catch (final Exception exception) {
+            log.error("Failed to send message to notification service", exception);
+        }
+    }
 }
diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsService.java
new file mode 100644 (file)
index 0000000..e3315c9
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 TechMahindra Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.events;
+
+import io.cloudevents.CloudEvent;
+import io.micrometer.core.annotation.Timed;
+import java.time.OffsetDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.events.model.CpsDataUpdatedEvent;
+import org.onap.cps.events.model.Data;
+import org.onap.cps.events.model.Data.Operation;
+import org.onap.cps.spi.model.Anchor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class CpsDataUpdateEventsService {
+
+    private final EventsPublisher<CpsDataUpdatedEvent> eventsPublisher;
+
+    @Value("${app.cps.data-updated.topic:cps-data-updated-events}")
+    private String topicName;
+
+    @Value("${notification.enabled:false}")
+    private boolean notificationsEnabled;
+
+    /**
+     * Publish the cps data update event with header to the public topic.
+     *
+     * @param anchor Anchor of the updated data
+     * @param xpath  xpath of the updated data
+     * @param operation operation performed on the data
+     * @param observedTimestamp timestamp when data was updated.
+     */
+    @Timed(value = "cps.dataupdate.events.publish", description = "Time taken to publish Data Update event")
+    public void publishCpsDataUpdateEvent(final Anchor anchor, final String xpath,
+                                          final Operation operation, final OffsetDateTime observedTimestamp) {
+        if (notificationsEnabled) {
+            final CpsDataUpdatedEvent cpsDataUpdatedEvent = createCpsDataUpdatedEvent(anchor,
+                    observedTimestamp, xpath, operation);
+            final String updateEventId = anchor.getDataspaceName() + ":" + anchor.getName();
+            final Map<String, String> extensions = createUpdateEventExtensions(updateEventId);
+            final CloudEvent cpsDataUpdatedEventAsCloudEvent =
+                    CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent)
+                            .extensions(extensions).build().asCloudEvent();
+            eventsPublisher.publishCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
+        } else {
+            log.debug("Notifications disabled.");
+        }
+    }
+
+    private CpsDataUpdatedEvent createCpsDataUpdatedEvent(final Anchor anchor, final OffsetDateTime observedTimestamp,
+                                                          final String xpath,
+                                                          final Operation rootNodeOperation) {
+        final CpsDataUpdatedEvent cpsDataUpdatedEvent = new CpsDataUpdatedEvent();
+        final Data updateEventData = new Data();
+        updateEventData.setObservedTimestamp(observedTimestamp.toString());
+        updateEventData.setDataspaceName(anchor.getDataspaceName());
+        updateEventData.setAnchorName(anchor.getName());
+        updateEventData.setSchemaSetName(anchor.getSchemaSetName());
+        updateEventData.setOperation(getRootNodeOperation(xpath, rootNodeOperation));
+        updateEventData.setXpath(xpath);
+        cpsDataUpdatedEvent.setData(updateEventData);
+        return cpsDataUpdatedEvent;
+    }
+
+    private Map<String, String> createUpdateEventExtensions(final String eventKey) {
+        final Map<String, String> extensions = new HashMap<>();
+        extensions.put("correlationid", eventKey);
+        return extensions;
+    }
+
+    private Operation getRootNodeOperation(final String xpath, final Operation operation) {
+        return isRootXpath(xpath) || isRootContainerNodeXpath(xpath) ? operation : Operation.UPDATE;
+    }
+
+    private static boolean isRootXpath(final String xpath) {
+        return "/".equals(xpath) || "".equals(xpath);
+    }
+
+    private static boolean isRootContainerNodeXpath(final String xpath) {
+        return 0 == xpath.lastIndexOf('/');
+    }
+}
diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java b/cps-service/src/main/java/org/onap/cps/events/CpsEvent.java
new file mode 100644 (file)
index 0000000..c19abc1
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 TechMahindra Ltd.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.events;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+import lombok.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.cps.utils.JsonObjectMapper;
+
+@Builder
+public class CpsEvent {
+
+    private Object data;
+    private Map<String, String> extensions;
+    private String type;
+    @Builder.Default
+    private static final String CLOUD_EVENT_SPEC_VERSION_V1 = "1.0.0";
+    @Builder.Default
+    private static final String CLOUD_EVENT_SOURCE = "CPS";
+
+    private final JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper());
+
+    /**
+     * Creates ncmp cloud event with provided attributes.
+     *
+     * @return Cloud Event
+     */
+
+    public CloudEvent asCloudEvent() {
+        final CloudEventBuilder cloudEventBuilder = io.cloudevents.core.builder
+            .CloudEventBuilder.v1()
+            .withId(UUID.randomUUID().toString())
+            .withSource(URI.create(CLOUD_EVENT_SOURCE))
+            .withType(type)
+            .withDataSchema(URI.create("urn:cps:" + type + ":" + CLOUD_EVENT_SPEC_VERSION_V1))
+            .withData(jsonObjectMapper.asJsonBytes(data));
+        extensions.entrySet().stream()
+            .filter(extensionEntry -> StringUtils.isNotBlank(extensionEntry.getValue()))
+            .forEach(extensionEntry ->
+                    cloudEventBuilder.withExtension(extensionEntry.getKey(), extensionEntry.getValue()));
+        return cloudEventBuilder.build();
+    }
+}
index b2b2d7d..fcbfd05 100644 (file)
@@ -3,7 +3,7 @@
  *  Copyright (C) 2021-2024 Nordix Foundation
  *  Modifications Copyright (C) 2021 Pantheon.tech
  *  Modifications Copyright (C) 2021-2022 Bell Canada.
- *  Modifications Copyright (C) 2022-2023 TechMahindra Ltd.
+ *  Modifications Copyright (C) 2022-2024 TechMahindra Ltd.
  *  Modifications Copyright (C) 2022 Deutsche Telekom AG
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 
 package org.onap.cps.api.impl
 
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.core.read.ListAppender
 import org.onap.cps.TestUtils
 import org.onap.cps.api.CpsAnchorService
 import org.onap.cps.api.CpsDeltaService
+import org.onap.cps.events.CpsDataUpdateEventsService
 import org.onap.cps.spi.CpsDataPersistenceService
 import org.onap.cps.spi.FetchDescendantsOption
 import org.onap.cps.spi.exceptions.ConcurrencyException
@@ -41,6 +45,8 @@ import org.onap.cps.utils.YangParser
 import org.onap.cps.utils.YangParserHelper
 import org.onap.cps.yang.YangTextSchemaSourceSet
 import org.onap.cps.yang.YangTextSchemaSourceSetBuilder
+import org.slf4j.LoggerFactory
+import org.springframework.context.annotation.AnnotationConfigApplicationContext
 import spock.lang.Shared
 import spock.lang.Specification
 import java.time.OffsetDateTime
@@ -52,13 +58,28 @@ class CpsDataServiceImplSpec extends Specification {
     def mockCpsValidator = Mock(CpsValidator)
     def yangParser = new YangParser(new YangParserHelper(), mockYangTextSchemaSourceSetCache)
     def mockCpsDeltaService = Mock(CpsDeltaService);
+    def mockDataUpdateEventsService = Mock(CpsDataUpdateEventsService)
 
-    def objectUnderTest = new CpsDataServiceImpl(mockCpsDataPersistenceService, mockCpsAnchorService, mockCpsValidator, yangParser, mockCpsDeltaService)
+    def objectUnderTest = new CpsDataServiceImpl(mockCpsDataPersistenceService, mockDataUpdateEventsService, mockCpsAnchorService, mockCpsValidator, yangParser, mockCpsDeltaService)
+
+    def logger = (Logger) LoggerFactory.getLogger(objectUnderTest.class)
+    def loggingListAppender
+    def applicationContext = new AnnotationConfigApplicationContext()
 
     def setup() {
         mockCpsAnchorService.getAnchor(dataspaceName, anchorName) >> anchor
         mockCpsAnchorService.getAnchor(dataspaceName, ANCHOR_NAME_1) >> anchor1
         mockCpsAnchorService.getAnchor(dataspaceName, ANCHOR_NAME_2) >> anchor2
+        logger.setLevel(Level.DEBUG)
+        loggingListAppender = new ListAppender()
+        logger.addAppender(loggingListAppender)
+        loggingListAppender.start()
+        applicationContext.refresh()
+    }
+
+    void cleanup() {
+        ((Logger) LoggerFactory.getLogger(CpsDataServiceImpl.class)).detachAndStopAllAppenders()
+        applicationContext.close()
     }
 
     @Shared
@@ -459,6 +480,19 @@ class CpsDataServiceImplSpec extends Specification {
             1 * mockCpsDataPersistenceService.lockAnchor('some-sessionId', 'some-dataspaceName', 'some-anchorName', 250L)
     }
 
+    def 'Exception is thrown while publishing the notification.'(){
+        given: 'schema set for given anchor and dataspace references test-tree model'
+            setupSchemaSetMocks('test-tree.yang')
+        when: 'publisher set to throw an exception'
+            mockDataUpdateEventsService.publishCpsDataUpdateEvent(_, _, _, _) >> { throw new Exception("publishing failed")}
+        and: 'an update event is performed'
+            objectUnderTest.updateNodeLeaves(dataspaceName, anchorName, '/', '{"test-tree": {"branch": []}}', observedTimestamp)
+        then: 'the exception is not bubbled up'
+            noExceptionThrown()
+        and: "the exception message is logged"
+            def logs = loggingListAppender.list.toString()
+            assert logs.contains('Failed to send message to notification service')
+    }
     def setupSchemaSetMocks(String... yangResources) {
         def mockYangTextSchemaSourceSet = Mock(YangTextSchemaSourceSet)
         mockYangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName) >> mockYangTextSchemaSourceSet
index 140dfaa..57f2f8e 100755 (executable)
@@ -3,7 +3,7 @@
  * Copyright (C) 2021-2024 Nordix Foundation.\r
  * Modifications Copyright (C) 2021-2022 Bell Canada.\r
  * Modifications Copyright (C) 2021 Pantheon.tech\r
- * Modifications Copyright (C) 2022-2023 TechMahindra Ltd.\r
+ * Modifications Copyright (C) 2022-2024 TechMahindra Ltd.\r
  * ================================================================================\r
  * Licensed under the Apache License, Version 2.0 (the "License");\r
  * you may not use this file except in compliance with the License.\r
@@ -26,6 +26,7 @@ package org.onap.cps.api.impl
 import org.onap.cps.TestUtils\r
 import org.onap.cps.api.CpsAnchorService\r
 import org.onap.cps.api.CpsDeltaService\r
+import org.onap.cps.events.CpsDataUpdateEventsService\r
 import org.onap.cps.spi.CpsDataPersistenceService\r
 import org.onap.cps.spi.CpsModulePersistenceService\r
 import org.onap.cps.spi.model.Anchor\r
@@ -50,7 +51,8 @@ class E2ENetworkSliceSpec extends Specification {
     def cpsModuleServiceImpl = new CpsModuleServiceImpl(mockModuleStoreService,\r
             mockYangTextSchemaSourceSetCache, mockCpsAnchorService, mockCpsValidator,timedYangTextSchemaSourceSetBuilder)\r
 \r
-    def cpsDataServiceImpl = new CpsDataServiceImpl(mockDataStoreService, mockCpsAnchorService, mockCpsValidator, yangParser, mockCpsDeltaService)\r
+    def mockDataUpdateEventsService = Mock(CpsDataUpdateEventsService)\r
+    def cpsDataServiceImpl = new CpsDataServiceImpl(mockDataStoreService, mockDataUpdateEventsService, mockCpsAnchorService, mockCpsValidator, yangParser, mockCpsDeltaService)\r
 \r
     def dataspaceName = 'someDataspace'\r
     def anchorName = 'someAnchor'\r
diff --git a/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsServiceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsServiceSpec.groovy
new file mode 100644 (file)
index 0000000..81b2bf2
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 TechMahindra Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.events
+
+import static org.onap.cps.events.model.Data.Operation.CREATE
+import static org.onap.cps.events.model.Data.Operation.DELETE
+import static org.onap.cps.events.model.Data.Operation.UPDATE
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.jackson.PojoCloudEventDataMapper
+import org.onap.cps.events.model.CpsDataUpdatedEvent
+import org.onap.cps.events.model.Data
+import org.onap.cps.spi.model.Anchor
+import org.onap.cps.utils.JsonObjectMapper
+import org.springframework.test.context.ContextConfiguration
+import spock.lang.Specification
+
+import java.time.OffsetDateTime
+
+@ContextConfiguration(classes = [ObjectMapper, JsonObjectMapper])
+class CpsDataUpdateEventsServiceSpec extends Specification {
+    def mockEventsPublisher = Mock(EventsPublisher)
+    def notificationsEnabled = true
+    def objectMapper = new ObjectMapper();
+
+    def objectUnderTest = new CpsDataUpdateEventsService(mockEventsPublisher)
+
+    def 'Create and Publish cps update event where events are #scenario'() {
+        given: 'an anchor, operation and observed timestamp'
+            def anchor = new Anchor('anchor01', 'dataspace01', 'schema01');
+            def operation = operationInRequest
+            def observedTimestamp = OffsetDateTime.now()
+        and: 'notificationsEnabled is #notificationsEnabled and it will be true as default'
+            objectUnderTest.notificationsEnabled = true
+        when: 'service is called to publish data update event'
+            objectUnderTest.topicName = "cps-core-event"
+            objectUnderTest.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp)
+        then: 'the event contains the required attributes'
+            1 * mockEventsPublisher.publishCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
+            args ->
+                {
+                    def cpsDataUpdatedEvent = (args[2] as CloudEvent)
+                    assert cpsDataUpdatedEvent.getExtension('correlationid') == 'dataspace01:anchor01'
+                    assert cpsDataUpdatedEvent.type == 'org.onap.cps.events.model.CpsDataUpdatedEvent'
+                    assert cpsDataUpdatedEvent.source.toString() == 'CPS'
+                    def actualEventOperation = CloudEventUtils.mapData(cpsDataUpdatedEvent, PojoCloudEventDataMapper.from(objectMapper, CpsDataUpdatedEvent.class)).getValue().data.operation
+                    assert actualEventOperation == expectedOperation
+                }
+            }
+        where: 'the following values are used'
+        scenario                                   | xpath        | operationInRequest  || expectedOperation
+        'empty xpath'                              | ''           | CREATE              || CREATE
+        'root xpath and create operation'          | '/'          | CREATE              || CREATE
+        'root xpath and update operation'          | '/'          | UPDATE              || UPDATE
+        'root xpath and delete operation'          | '/'          | DELETE              || DELETE
+        'not root xpath and update operation'      | 'test'       | UPDATE              || UPDATE
+        'root node xpath and create operation'     | '/test'      | CREATE              || CREATE
+        'non root node xpath and update operation' | '/test/path' | CREATE              || UPDATE
+        'non root node xpath and delete operation' | '/test/path' | DELETE              || UPDATE
+    }
+
+    def 'publish cps update event when notification service is disabled'() {
+        given: 'an anchor, operation and observed timestamp'
+            def anchor = new Anchor('anchor01', 'dataspace01', 'schema01');
+            def operation = CREATE
+            def observedTimestamp = OffsetDateTime.now()
+        and: 'notificationsEnabled is flase'
+            objectUnderTest.notificationsEnabled = false
+        when: 'service is called to publish data update event'
+            objectUnderTest.topicName = "cps-core-event"
+            objectUnderTest.publishCpsDataUpdateEvent(anchor, '/', operation, observedTimestamp)
+        then: 'the event contains the required attributes'
+            0 * mockEventsPublisher.publishCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
+    }
+}