Process Cm Avc Event in NCMP 96/141596/9
authormpriyank <priyank.maheshwari@est.tech>
Mon, 21 Jul 2025 11:21:46 +0000 (12:21 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Wed, 30 Jul 2025 16:09:00 +0000 (17:09 +0100)
- process the cm avc event when the source system is set as ONAP DMI
  Plugin in the header
- handle the create , update , patch and delete operation to the CPS
  Cache.
- Testware to support the write operation
- Usage of the RestConf to Cps Path utility in this patch

Issue-ID: CPS-2777
Change-Id: I43233cf52f411a5d3cc20b03d0ef6e6e50399aa7
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventService.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcOperationEnum.java [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventServiceSpec.groovy [new file with mode: 0644]
cps-service/src/main/java/org/onap/cps/utils/YangParser.java
cps-service/src/test/groovy/org/onap/cps/utils/YangParserSpec.groovy

index c8aec3e..f504207 100644 (file)
 
 package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
 
+import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
+
 import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
 import io.micrometer.core.annotation.Timed;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
 import org.onap.cps.events.EventsProducer;
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
+import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
@@ -41,25 +47,48 @@ import org.springframework.stereotype.Component;
 public class CmAvcEventConsumer {
 
 
+    private static final String CLOUD_EVENT_SOURCE_SYSTEM_HEADER_KEY = "ce_source";
+
     @Value("${app.ncmp.avc.cm-events-topic}")
     private String cmEventsTopicName;
 
     private final EventsProducer<CloudEvent> eventsProducer;
+    private final CmAvcEventService cmAvcEventService;
+    private final InventoryPersistence inventoryPersistence;
 
     /**
      * Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic.
      * The key from incoming record will be used as key for the target topic as well to preserve the message ordering.
+     * If event is coming from ONAP-DMI-PLUGIN then the event will also be processed by NCMP and the cps cache/database
+     * will be updated.
      *
      * @param cmAvcEventAsConsumerRecord Incoming raw consumer record
      */
     @KafkaListener(topics = "${app.dmi.cm-events.topic}",
             containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
     @Timed(value = "cps.ncmp.cmnotifications.consumeandforward", description = "Time taken to forward CM AVC events")
-    public void consumeAndForward(
-            final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
+    public void consumeAndForward(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
+        if (isEventFromOnapDmiPlugin(cmAvcEventAsConsumerRecord.headers())) {
+            processCmAvcEventChanges(cmAvcEventAsConsumerRecord);
+        }
         final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
         final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
         log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
         eventsProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
     }
+
+    private void processCmAvcEventChanges(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
+        final String cmHandleId = cmAvcEventAsConsumerRecord.key();
+        final Boolean dataSyncEnabled = inventoryPersistence.getCmHandleState(cmHandleId).getDataSyncEnabled();
+        if (Boolean.TRUE.equals(dataSyncEnabled)) {
+            final AvcEvent cmAvcEvent = toTargetEvent(cmAvcEventAsConsumerRecord.value(), AvcEvent.class);
+            log.debug("Event to be processed to update the cache with cmHandleId : {}", cmHandleId);
+            cmAvcEventService.processCmAvcEvent(cmHandleId, cmAvcEvent);
+        }
+    }
+
+    private boolean isEventFromOnapDmiPlugin(final Headers headers) {
+        final String sourceSystem = KafkaHeaders.getParsedKafkaHeader(headers, CLOUD_EVENT_SOURCE_SYSTEM_HEADER_KEY);
+        return sourceSystem != null && sourceSystem.equals("ONAP-DMI-PLUGIN");
+    }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventService.java
new file mode 100644 (file)
index 0000000..061fe56
--- /dev/null
@@ -0,0 +1,151 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
+
+import static org.onap.cps.cpspath.parser.CpsPathUtil.NO_PARENT_PATH;
+import static org.onap.cps.cpspath.parser.CpsPathUtil.getNormalizedParentXpath;
+import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME;
+import static org.onap.cps.utils.ContentType.JSON;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.api.CpsAnchorService;
+import org.onap.cps.api.CpsDataService;
+import org.onap.cps.api.model.Anchor;
+import org.onap.cps.cpspath.parser.CpsPathUtil;
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
+import org.onap.cps.ncmp.events.avc1_0_0.Edit;
+import org.onap.cps.utils.JsonObjectMapper;
+import org.onap.cps.utils.YangParser;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class CmAvcEventService {
+
+    private static final OffsetDateTime NO_TIMESTAMP = null;
+
+    private final CpsDataService cpsDataService;
+    private final CpsAnchorService cpsAnchorService;
+    private final JsonObjectMapper jsonObjectMapper;
+    private final YangParser yangParser;
+
+    /**
+     * Process the incoming AvcEvent and apply the changes to the cache.
+     *
+     * @param cmHandleId cm handle identifier
+     * @param cmAvcEvent cm avc event
+     */
+    public void processCmAvcEvent(final String cmHandleId, final AvcEvent cmAvcEvent) {
+
+        final List<Edit> edits =
+                cmAvcEvent.getData().getPushChangeUpdate().getDatastoreChanges().getIetfYangPatchYangPatch().getEdit();
+
+        edits.forEach(
+                edit -> handleCmAvcEventOperation(CmAvcOperationEnum.fromValue(edit.getOperation()), cmHandleId, edit));
+    }
+
+    private void handleCmAvcEventOperation(final CmAvcOperationEnum cmAvcOperation, final String cmHandleId,
+            final Edit cmAvcEventEdit) {
+
+        log.info("Operation : {} requested for cmHandleId : {}", cmAvcOperation.getValue(), cmHandleId);
+
+        switch (cmAvcOperation) {
+            case CREATE:
+                handleCreate(cmHandleId, cmAvcEventEdit);
+                break;
+
+            case UPDATE:
+                handleUpdate(cmHandleId, cmAvcEventEdit);
+                break;
+
+            case PATCH:
+                handlePatch(cmHandleId, cmAvcEventEdit);
+                break;
+
+            case DELETE:
+                handleDelete(cmHandleId, cmAvcEventEdit);
+                break;
+
+            default:
+                log.error("Unhandled operation : {} for cmHandleId : {}", cmAvcOperation, cmHandleId);
+        }
+    }
+
+    private void handleCreate(final String cmHandleId, final Edit cmAvcEventEdit) {
+        log.debug("Handling create operation for cmHandleId : {}", cmHandleId);
+        final String jsonData = extractNodeData(cmAvcEventEdit);
+        cpsDataService.saveData(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, jsonData, NO_TIMESTAMP);
+    }
+
+    private void handleUpdate(final String cmHandleId, final Edit cmAvcEventEdit) {
+        final String jsonData = extractNodeData(cmAvcEventEdit);
+        final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget());
+        final String parentXpath = getNormalizedParentXpath(cpsPathFromRestConfStylePath);
+        log.debug("Handling update operation for cmHandleId : {} , cpsPath : {} and parent-xpath : {}", cmHandleId,
+                cpsPathFromRestConfStylePath, parentXpath);
+        cpsDataService.updateDataNodeAndDescendants(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
+                resolveParentNodeXpath(parentXpath), jsonData, NO_TIMESTAMP, JSON);
+    }
+
+    private void handlePatch(final String cmHandleId, final Edit cmAvcEventEdit) {
+        final String jsonData = extractNodeData(cmAvcEventEdit);
+        final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget());
+        final String parentXpath = getNormalizedParentXpath(cpsPathFromRestConfStylePath);
+        log.debug("Handling patch operation for cmHandleId : {} , cpsPath : {} and parent-xpath : {}", cmHandleId,
+                cpsPathFromRestConfStylePath, parentXpath);
+        cpsDataService.updateNodeLeaves(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
+                resolveParentNodeXpath(parentXpath), jsonData, NO_TIMESTAMP, JSON);
+
+    }
+
+    private void handleDelete(final String cmHandleId, final Edit cmAvcEventEdit) {
+        if (NO_PARENT_PATH.equals(cmAvcEventEdit.getTarget()) || cmAvcEventEdit.getTarget() == null) {
+            log.debug("Deleting all the entries for cmHandleId : {}", cmHandleId);
+            cpsDataService.deleteDataNodes(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, NO_TIMESTAMP);
+        } else {
+            final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget());
+            log.debug("Deleting data for xpath : {}", cpsPathFromRestConfStylePath);
+            cpsDataService.deleteDataNode(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
+                    cpsPathFromRestConfStylePath, NO_TIMESTAMP);
+
+        }
+    }
+
+    private String extractNodeData(final Edit cmAvcEventEdit) {
+        return jsonObjectMapper.convertJsonString(jsonObjectMapper.asJsonString(cmAvcEventEdit.getValue()),
+                String.class);
+    }
+
+    private String getCpsPath(final String cmHandleId, final String restConfStylePath) {
+        log.debug("Getting cps path from the restconfpath : {}", restConfStylePath);
+        final Anchor anchor = cpsAnchorService.getAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId);
+        return yangParser.getCpsPathFromRestConfStylePath(anchor, restConfStylePath);
+    }
+
+    private String resolveParentNodeXpath(final String parentNodeXpath) {
+        return parentNodeXpath.isEmpty() ? CpsPathUtil.ROOT_NODE_XPATH : parentNodeXpath;
+    }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcOperationEnum.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcOperationEnum.java
new file mode 100644 (file)
index 0000000..09c4921
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+
+/*
+ Enum for valid Cm Avc Event operations.
+ */
+public enum CmAvcOperationEnum {
+
+    READ("read"),
+
+    CREATE("create"),
+
+    UPDATE("update"),
+
+    PATCH("patch"),
+
+    DELETE("delete");
+
+    private final String value;
+
+    CmAvcOperationEnum(final String value) {
+        this.value = value;
+    }
+
+    @JsonValue
+    public String getValue() {
+        return value;
+    }
+
+    /**
+     * Returns the Operation Enum.
+     *
+     * @param value string operation
+     * @return CmAvcOperationEnum
+     */
+    public static CmAvcOperationEnum fromValue(final String value) {
+        for (final CmAvcOperationEnum b : CmAvcOperationEnum.values()) {
+            if (b.value.equals(value)) {
+                return b;
+            }
+        }
+        throw new IllegalArgumentException("Unexpected value '" + value + "'");
+    }
+
+}
index b0a8f20..9cf3684 100644 (file)
@@ -28,8 +28,9 @@ import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.cps.events.EventsProducer
+import org.onap.cps.ncmp.api.inventory.models.CompositeState
 import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
-import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
 import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
 import org.onap.cps.utils.JsonObjectMapper
 import org.spockframework.spring.SpringBean
@@ -37,8 +38,11 @@ import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.annotation.DirtiesContext
 import org.testcontainers.spock.Testcontainers
+
+import java.nio.charset.Charset
 import java.time.Duration
 
+import static org.onap.cps.ncmp.utils.TestUtils.getResourceFileContent
 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 
 @SpringBootTest(classes = [EventsProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
@@ -49,49 +53,84 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec {
     @SpringBean
     EventsProducer eventsProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
+    def mockCmAvcEventService = Mock(CmAvcEventService)
+    def mockInventoryPersistence = Mock(InventoryPersistence)
+
     @SpringBean
-    CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsProducer)
+    CmAvcEventConsumer cmAvcEventConsumer = new CmAvcEventConsumer(eventsProducer, mockCmAvcEventService, mockInventoryPersistence)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
-    def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer))
-
-    def 'Consume and forward valid message'() {
-        given: 'consumer has a subscription on a topic'
-            def cmEventsTopicName = 'cm-events'
-            acvEventConsumer.cmEventsTopicName = cmEventsTopicName
-            cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
-        and: 'an event is sent'
-            def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
-            def testEventKey = 'sample-eventid-key'
-            def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
-            def testCloudEventSent = CloudEventBuilder.v1()
-                .withData(jsonObjectMapper.asJsonBytes(testEventSent))
-                .withId('sample-eventid')
-                .withType('sample-test-type')
-                .withSource(URI.create('sample-test-source'))
-                .withExtension('correlationid', 'test-cmhandle1').build()
-        and: 'event has header information'
-            def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent)
+    def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('group for Test A', CloudEventDeserializer))
+
+    def testAvcEvent
+    def testEventKey
+
+    def setup() {
+        testEventKey = 'sample-key'
+        testAvcEvent = jsonObjectMapper.convertJsonString(getResourceFileContent('sampleAvcInputEvent.json'), AvcEvent.class)
+    }
+
+    def 'Test A : Consume and forward valid message'() {
+        given: 'a cloud event'
+            def testCloudEventSent = buildCloudEvent('sample-source', 'test-cmhandle1')
+        and: 'consumer has a subscription on the target topic for this test'
+            cmAvcEventConsumer.cmEventsTopicName = 'target-topic-for-Test-A'
+            cloudEventKafkaConsumer.subscribe([cmAvcEventConsumer.cmEventsTopicName])
+        and: 'event is wrapped in a consumer record with message key(cmHandleId) and value as cloud event'
+            def consumerRecordReceived = new ConsumerRecord<String, CloudEvent>(cmAvcEventConsumer.cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent)
         when: 'the event is consumed and forwarded to target topic'
-            acvEventConsumer.consumeAndForward(consumerRecord)
-        and: 'the target topic is polled'
-            def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
-        then: 'poll returns one record'
-            assert records.size() == 1
-        and: 'target record can be converted to AVC event'
-            def record = records.iterator().next()
-            def cloudEvent = record.value() as CloudEvent
-            def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class)
+            cmAvcEventConsumer.consumeAndForward(consumerRecordReceived)
+        then: 'the consumer record can be read from the target topic within 2 seconds'
+            def consumerRecordOnTargetTopic = cloudEventKafkaConsumer.poll(Duration.ofMillis(2000)).iterator().next()
         and: 'the target event has the same key as the source event to maintain the ordering in a partition'
-            assert record.key() == consumerRecord.key()
+            def cloudEventFromTargetTopic = consumerRecordOnTargetTopic.value() as CloudEvent
+            def avcEventFromTargetTopic = toTargetEvent(cloudEventFromTargetTopic, AvcEvent.class)
+            assert consumerRecordOnTargetTopic.key() == consumerRecordReceived.key()
         and: 'we have correct headers forwarded where correlation id matches'
-            assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOnTargetTopic.headers(), 'ce_correlationid') == 'test-cmhandle1'
         and: 'event id is same between consumed and forwarded'
-            assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') == 'sample-eventid'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOnTargetTopic.headers(), 'ce_id') == 'sample-eventid'
         and: 'the event payload still matches'
-            assert testEventSent == convertedAvcEvent
+            assert avcEventFromTargetTopic == testAvcEvent
     }
 
+    def 'Test B : Consume and process CM Avc Event when #scenario'() {
+        given: 'a cloud event is created(what we get from ONAP-DMI-PLUGIN)'
+            def sourceSystem = 'ONAP-DMI-PLUGIN'
+            def testCloudEventSent = buildCloudEvent(sourceSystem, 'some-cmhandle-id')
+        and: 'a separate topic for this test'
+            cmAvcEventConsumer.cmEventsTopicName =  'some-topic-for-Test-B'
+        and: 'inventory persistence service has #scenario'
+            def compositeState = new CompositeState(dataSyncEnabled: dataSyncFlag)
+            1 * mockInventoryPersistence.getCmHandleState(_) >> compositeState
+        and: 'event has source system as ONAP-DMI-PLUGIN and key(cmHandleId) and value as cloud event'
+            def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmAvcEventConsumer.cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent)
+            consumerRecord.headers().add('ce_source', sourceSystem.getBytes(Charset.defaultCharset()))
+        when: 'the event is consumed'
+            cmAvcEventConsumer.consumeAndForward(consumerRecord)
+        then: 'cm avc event is processed for updating the cached data'
+            expectedCallToProcessCmAvcEvent * mockCmAvcEventService.processCmAvcEvent(testEventKey, _) >> { args ->
+                {
+                    assert args[1] instanceof AvcEvent
+                }
+            }
+        where: 'following scenarios are used'
+            scenario                  | dataSyncFlag || expectedCallToProcessCmAvcEvent
+            'data sync flag enabled'  | true         || 1
+            'data sync flag disabled' | false        || 0
+
+    }
+
+    def buildCloudEvent(sourceSystem, cmHandleId){
+
+        return CloudEventBuilder.v1()
+            .withData(jsonObjectMapper.asJsonBytes(testAvcEvent))
+            .withId('sample-eventid')
+            .withType('sample-test-type')
+            .withSource(URI.create(sourceSystem as String))
+            .withExtension('correlationid', cmHandleId).build()
+
+    }
 }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventServiceSpec.groovy
new file mode 100644 (file)
index 0000000..3822ba3
--- /dev/null
@@ -0,0 +1,148 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.onap.cps.api.CpsAnchorService
+import org.onap.cps.api.CpsDataService
+import org.onap.cps.api.model.Anchor
+import org.onap.cps.cpspath.parser.CpsPathUtil
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
+import org.onap.cps.ncmp.events.avc1_0_0.Data
+import org.onap.cps.ncmp.events.avc1_0_0.DatastoreChanges
+import org.onap.cps.ncmp.events.avc1_0_0.Edit
+import org.onap.cps.ncmp.events.avc1_0_0.IetfYangPatchYangPatch
+import org.onap.cps.ncmp.events.avc1_0_0.PushChangeUpdate
+import org.onap.cps.utils.JsonObjectMapper
+import org.onap.cps.utils.YangParser
+import spock.lang.Specification
+
+class CmAvcEventServiceSpec extends Specification {
+
+    def mockCpsDataService = Mock(CpsDataService)
+    def mockCpsAnchorService = Mock(CpsAnchorService)
+    def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
+    def mockYangParser = Mock(YangParser)
+
+    def static NO_TIMESTAMP = null
+    def static NO_XPATH = ''
+
+    def objectUnderTest = new CmAvcEventService(
+        mockCpsDataService,
+        mockCpsAnchorService,
+        jsonObjectMapper,
+        mockYangParser
+    )
+
+    def cmHandleId = 'test-cmhandle-id'
+    def sampleJson = '{"some-data": "test-data"}'
+
+    def 'process CREATE operation'() {
+        given: 'An edit with CREATE operation'
+            def testAvcEventForCreate = testAvcEvent('create', NO_XPATH)
+        when: 'The AVC event is processed'
+            objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForCreate)
+        then: 'data is saved via cps service'
+            1 * mockCpsDataService.saveData(_, cmHandleId, sampleJson, NO_TIMESTAMP)
+    }
+
+    def 'Process UPDATE operation'() {
+        given: 'An edit with UPDATE operation and a valid target path'
+            def targetPath = '/test/path'
+            def anchor = Mock(Anchor)
+            mockCpsAnchorService.getAnchor(_, cmHandleId) >> anchor
+            mockYangParser.getCpsPathFromRestConfStylePath(anchor, targetPath) >> '/parsed/cps/path'
+            def testAvcEventForUpdate = testAvcEvent('update', targetPath)
+        when: 'The AVC event is processed'
+            objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForUpdate)
+        then: 'Data node and descendants are updated via CPS service'
+            1 * mockCpsDataService.updateDataNodeAndDescendants(_, cmHandleId, _, sampleJson, NO_TIMESTAMP, _)
+    }
+
+    def 'Process PATCH operation'() {
+        given: 'An edit with PATCH operation and a valid target path'
+            def targetPath = '/test/path'
+            def anchor = Mock(Anchor)
+            mockCpsAnchorService.getAnchor(_, cmHandleId) >> anchor
+            mockYangParser.getCpsPathFromRestConfStylePath(anchor, targetPath) >> '/parsed/cps/path'
+            def testAvcEventForPatch = testAvcEvent('patch', targetPath)
+        when: 'The AVC event is processed'
+            objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForPatch)
+        then: 'Node leaves are updated via CPS service'
+            1 * mockCpsDataService.updateNodeLeaves(_, cmHandleId, _, sampleJson, NO_TIMESTAMP, _)
+    }
+
+    def 'Process DELETE operation with target'() {
+        given: 'An edit with DELETE operation and a specific target path'
+            def targetPath = '/test/path'
+            def anchor = Mock(Anchor)
+            mockCpsAnchorService.getAnchor(_, cmHandleId) >> anchor
+            mockYangParser.getCpsPathFromRestConfStylePath(anchor, targetPath) >> '/parsed/cps/path'
+            def testAvcEventForDelete = testAvcEvent('delete', targetPath)
+        when: 'The AVC event is processed'
+            objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForDelete)
+        then: 'Data node is deleted at the given path'
+            1 * mockCpsDataService.deleteDataNode(_, cmHandleId, '/parsed/cps/path', NO_TIMESTAMP)
+    }
+
+    def 'Process DELETE operation with no target (delete all)'() {
+        given: 'An edit with DELETE operation and no target'
+            def testAvcEventForDelete = testAvcEvent('delete', NO_XPATH)
+        when: 'The AVC event is processed'
+            objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForDelete)
+        then: 'All data nodes for the cmHandleId are deleted'
+            1 * mockCpsDataService.deleteDataNodes(_, cmHandleId, NO_TIMESTAMP)
+        where: 'following targets are used'
+            target << [null, '']
+    }
+
+    def 'Resolve parent xpath correctly: #scenario'() {
+        expect: 'Parent xpath is resolved as expected'
+            assert objectUnderTest.resolveParentNodeXpath(inputXpath) == expectedXpath
+        where: 'following scenarios are used'
+            scenario                        | inputXpath   || expectedXpath
+            'when parentXpath is empty'     | ''           || CpsPathUtil.ROOT_NODE_XPATH
+            'when parentXpath is not empty' | '/test/path' || '/test/path'
+    }
+
+    def testAvcEvent(operation, targetXpath) {
+        new AvcEvent(
+            data: new Data(
+                pushChangeUpdate: new PushChangeUpdate(
+                    datastoreChanges: new DatastoreChanges(
+                        ietfYangPatchYangPatch: new IetfYangPatchYangPatch(
+                            patchId: 'test-patch-id',
+                            edit: [
+                                new Edit(
+                                    operation: operation,
+                                    editId: 'test-edit-id',
+                                    target: targetXpath,
+                                    value: sampleJson
+                                )
+                            ]
+                        )
+                    )
+                )
+            )
+        )
+    }
+
+}
index 5dfeb2f..a0846a7 100644 (file)
@@ -21,6 +21,7 @@
 
 package org.onap.cps.utils;
 
+import static org.onap.cps.utils.RestConfStylePathToCpsPathUtil.convertToCpsPath;
 import static org.onap.cps.utils.YangParserHelper.VALIDATE_AND_PARSE;
 import static org.onap.cps.utils.YangParserHelper.VALIDATE_ONLY;
 
@@ -48,20 +49,18 @@ public class YangParser {
     /**
      * Parses data into (normalized) ContainerNode according to schema context for the given anchor.
      *
-     * @param nodeData  data string
-     * @param anchor    the anchor for the node data
+     * @param nodeData data string
+     * @param anchor   the anchor for the node data
      * @return the NormalizedNode object
      */
     @Timed(value = "cps.utils.yangparser.nodedata.with.parent.parse",
-        description = "Time taken to parse node data with a parent")
-    public ContainerNode parseData(final ContentType contentType,
-                                   final String nodeData,
-                                   final Anchor anchor,
-                                   final String parentNodeXpath) {
+            description = "Time taken to parse node data with a parent")
+    public ContainerNode parseData(final ContentType contentType, final String nodeData, final Anchor anchor,
+            final String parentNodeXpath) {
         final SchemaContext schemaContext = getSchemaContext(anchor);
         try {
-            return yangParserHelper
-                    .parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_AND_PARSE);
+            return yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath,
+                    VALIDATE_AND_PARSE);
         } catch (final DataValidationException e) {
             invalidateCache(anchor);
         }
@@ -71,16 +70,14 @@ public class YangParser {
     /**
      * Parses data into (normalized) ContainerNode according to schema context for the given yang resource.
      *
-     * @param nodeData                    data string
-     * @param yangResourceContentPerName  yang resource content per name
-     * @return                            the NormalizedNode object
+     * @param nodeData                   data string
+     * @param yangResourceContentPerName yang resource content per name
+     * @return the NormalizedNode object
      */
     @Timed(value = "cps.utils.yangparser.nodedata.with.parent.with.yangResourceMap.parse",
             description = "Time taken to parse node data with a parent")
-    public ContainerNode parseData(final ContentType contentType,
-                                   final String nodeData,
-                                   final Map<String, String> yangResourceContentPerName,
-                                   final String parentNodeXpath) {
+    public ContainerNode parseData(final ContentType contentType, final String nodeData,
+            final Map<String, String> yangResourceContentPerName, final String parentNodeXpath) {
         final SchemaContext schemaContext = getSchemaContext(yangResourceContentPerName);
         return yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_AND_PARSE);
     }
@@ -88,16 +85,14 @@ public class YangParser {
     /**
      * Parses data to validate it, using the schema context for given anchor.
      *
-     * @param anchor                    the anchor used for node data validation
-     * @param parentNodeXpath           the xpath of the parent node
-     * @param nodeData                  JSON or XML data string to validate
-     * @param contentType               the content type of the data (e.g., JSON or XML)
-     * @throws DataValidationException  if validation fails
+     * @param anchor          the anchor used for node data validation
+     * @param parentNodeXpath the xpath of the parent node
+     * @param nodeData        JSON or XML data string to validate
+     * @param contentType     the content type of the data (e.g., JSON or XML)
+     * @throws DataValidationException if validation fails
      */
-    public void validateData(final ContentType contentType,
-                             final String nodeData,
-                             final Anchor anchor,
-                             final String parentNodeXpath) {
+    public void validateData(final ContentType contentType, final String nodeData, final Anchor anchor,
+            final String parentNodeXpath) {
         final SchemaContext schemaContext = getSchemaContext(anchor);
         try {
             yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_ONLY);
@@ -109,14 +104,26 @@ public class YangParser {
         yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_ONLY);
     }
 
+    /**
+     * Get Cps path from Restconf path.
+     *
+     * @param anchor            anchor
+     * @param restConfStylePath restconf path
+     * @return CpsPath
+     */
+    public String getCpsPathFromRestConfStylePath(final Anchor anchor, final String restConfStylePath) {
+        final SchemaContext schemaContext = getSchemaContext(anchor);
+        return convertToCpsPath(restConfStylePath, schemaContext);
+    }
+
     private SchemaContext getSchemaContext(final Anchor anchor) {
-        return yangTextSchemaSourceSetCache.get(anchor.getDataspaceName(),
-            anchor.getSchemaSetName()).getSchemaContext();
+        return yangTextSchemaSourceSetCache.get(anchor.getDataspaceName(), anchor.getSchemaSetName())
+                       .getSchemaContext();
     }
 
     private SchemaContext getSchemaContext(final Map<String, String> yangResourceContentPerName) {
         return timedYangTextSchemaSourceSetBuilder.getYangTextSchemaSourceSet(yangResourceContentPerName)
-                .getSchemaContext();
+                       .getSchemaContext();
     }
 
     private void invalidateCache(final Anchor anchor) {
index cb7a16c..a1a0b8c 100644 (file)
@@ -24,12 +24,14 @@ package org.onap.cps.utils
 import org.onap.cps.TestUtils
 import org.onap.cps.api.exceptions.DataValidationException
 import org.onap.cps.api.model.Anchor
+import org.onap.cps.impl.YangTextSchemaSourceSetCache
 import org.onap.cps.yang.TimedYangTextSchemaSourceSetBuilder
 import org.onap.cps.yang.YangTextSchemaSourceSet
+import org.opendaylight.yangtools.yang.common.QName
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode
+import org.opendaylight.yangtools.yang.model.api.ListSchemaNode
 import org.opendaylight.yangtools.yang.model.api.SchemaContext
 import spock.lang.Specification
-import org.onap.cps.impl.YangTextSchemaSourceSetCache
 
 class YangParserSpec extends Specification {
 
@@ -54,6 +56,25 @@ class YangParserSpec extends Specification {
         mockYangTextSchemaSourceSet.getSchemaContext() >> mockSchemaContext
     }
 
+    def 'Convert RestConf-style path to CPS path'() {
+        given: 'a RestConf-style path'
+            def restConfStylePath = '/bookstore:book=Book1'
+            def expectedCpsPath = '/book[@name=\'Book1\']'
+        and: 'a schema context that contains the matching node'
+            def mockedBookNode = Mock(ListSchemaNode) {
+                getQName() >> QName.create('bookstore', 'book')
+                getKeyDefinition() >> [QName.create('bookstore', 'name')]
+            }
+            mockSchemaContext.getChildNodes() >> [mockedBookNode]
+            mockedBookNode.getChildNodes() >> []
+        when: 'restconf style path is converted to cps path'
+            def result = objectUnderTest.getCpsPathFromRestConfStylePath(anchor, restConfStylePath)
+        then: 'the schema context is retrieved from the cache for the correct anchor'
+            1 * mockYangTextSchemaSourceSetCache.get('my dataspace', 'my schema') >> mockYangTextSchemaSourceSet
+        and: 'the CPS path is returned correctly'
+            assert result == expectedCpsPath
+    }
+
     def 'Parsing data.'() {
         given: 'the yang parser (utility) always returns a container node'
             mockYangParserHelper.parseData(ContentType.JSON, 'some json', mockSchemaContext, noParent, validateAndParse) >> containerNodeFromYangUtils