package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
+import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
+
import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
import io.micrometer.core.annotation.Timed;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
import org.onap.cps.events.EventsProducer;
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
+import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
public class CmAvcEventConsumer {
+ private static final String CLOUD_EVENT_SOURCE_SYSTEM_HEADER_KEY = "ce_source";
+
@Value("${app.ncmp.avc.cm-events-topic}")
private String cmEventsTopicName;
private final EventsProducer<CloudEvent> eventsProducer;
+ private final CmAvcEventService cmAvcEventService;
+ private final InventoryPersistence inventoryPersistence;
/**
* Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic.
* The key from incoming record will be used as key for the target topic as well to preserve the message ordering.
+ * If event is coming from ONAP-DMI-PLUGIN then the event will also be processed by NCMP and the cps cache/database
+ * will be updated.
*
* @param cmAvcEventAsConsumerRecord Incoming raw consumer record
*/
@KafkaListener(topics = "${app.dmi.cm-events.topic}",
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
@Timed(value = "cps.ncmp.cmnotifications.consumeandforward", description = "Time taken to forward CM AVC events")
- public void consumeAndForward(
- final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
+ public void consumeAndForward(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
+ if (isEventFromOnapDmiPlugin(cmAvcEventAsConsumerRecord.headers())) {
+ processCmAvcEventChanges(cmAvcEventAsConsumerRecord);
+ }
final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
eventsProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
}
+
+ private void processCmAvcEventChanges(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
+ final String cmHandleId = cmAvcEventAsConsumerRecord.key();
+ final Boolean dataSyncEnabled = inventoryPersistence.getCmHandleState(cmHandleId).getDataSyncEnabled();
+ if (Boolean.TRUE.equals(dataSyncEnabled)) {
+ final AvcEvent cmAvcEvent = toTargetEvent(cmAvcEventAsConsumerRecord.value(), AvcEvent.class);
+ log.debug("Event to be processed to update the cache with cmHandleId : {}", cmHandleId);
+ cmAvcEventService.processCmAvcEvent(cmHandleId, cmAvcEvent);
+ }
+ }
+
+ private boolean isEventFromOnapDmiPlugin(final Headers headers) {
+ final String sourceSystem = KafkaHeaders.getParsedKafkaHeader(headers, CLOUD_EVENT_SOURCE_SYSTEM_HEADER_KEY);
+ return sourceSystem != null && sourceSystem.equals("ONAP-DMI-PLUGIN");
+ }
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
+
+import static org.onap.cps.cpspath.parser.CpsPathUtil.NO_PARENT_PATH;
+import static org.onap.cps.cpspath.parser.CpsPathUtil.getNormalizedParentXpath;
+import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME;
+import static org.onap.cps.utils.ContentType.JSON;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.api.CpsAnchorService;
+import org.onap.cps.api.CpsDataService;
+import org.onap.cps.api.model.Anchor;
+import org.onap.cps.cpspath.parser.CpsPathUtil;
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
+import org.onap.cps.ncmp.events.avc1_0_0.Edit;
+import org.onap.cps.utils.JsonObjectMapper;
+import org.onap.cps.utils.YangParser;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class CmAvcEventService {
+
+ private static final OffsetDateTime NO_TIMESTAMP = null;
+
+ private final CpsDataService cpsDataService;
+ private final CpsAnchorService cpsAnchorService;
+ private final JsonObjectMapper jsonObjectMapper;
+ private final YangParser yangParser;
+
+ /**
+ * Process the incoming AvcEvent and apply the changes to the cache.
+ *
+ * @param cmHandleId cm handle identifier
+ * @param cmAvcEvent cm avc event
+ */
+ public void processCmAvcEvent(final String cmHandleId, final AvcEvent cmAvcEvent) {
+
+ final List<Edit> edits =
+ cmAvcEvent.getData().getPushChangeUpdate().getDatastoreChanges().getIetfYangPatchYangPatch().getEdit();
+
+ edits.forEach(
+ edit -> handleCmAvcEventOperation(CmAvcOperationEnum.fromValue(edit.getOperation()), cmHandleId, edit));
+ }
+
+ private void handleCmAvcEventOperation(final CmAvcOperationEnum cmAvcOperation, final String cmHandleId,
+ final Edit cmAvcEventEdit) {
+
+ log.info("Operation : {} requested for cmHandleId : {}", cmAvcOperation.getValue(), cmHandleId);
+
+ switch (cmAvcOperation) {
+ case CREATE:
+ handleCreate(cmHandleId, cmAvcEventEdit);
+ break;
+
+ case UPDATE:
+ handleUpdate(cmHandleId, cmAvcEventEdit);
+ break;
+
+ case PATCH:
+ handlePatch(cmHandleId, cmAvcEventEdit);
+ break;
+
+ case DELETE:
+ handleDelete(cmHandleId, cmAvcEventEdit);
+ break;
+
+ default:
+ log.error("Unhandled operation : {} for cmHandleId : {}", cmAvcOperation, cmHandleId);
+ }
+ }
+
+ private void handleCreate(final String cmHandleId, final Edit cmAvcEventEdit) {
+ log.debug("Handling create operation for cmHandleId : {}", cmHandleId);
+ final String jsonData = extractNodeData(cmAvcEventEdit);
+ cpsDataService.saveData(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, jsonData, NO_TIMESTAMP);
+ }
+
+ private void handleUpdate(final String cmHandleId, final Edit cmAvcEventEdit) {
+ final String jsonData = extractNodeData(cmAvcEventEdit);
+ final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget());
+ final String parentXpath = getNormalizedParentXpath(cpsPathFromRestConfStylePath);
+ log.debug("Handling update operation for cmHandleId : {} , cpsPath : {} and parent-xpath : {}", cmHandleId,
+ cpsPathFromRestConfStylePath, parentXpath);
+ cpsDataService.updateDataNodeAndDescendants(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
+ resolveParentNodeXpath(parentXpath), jsonData, NO_TIMESTAMP, JSON);
+ }
+
+ private void handlePatch(final String cmHandleId, final Edit cmAvcEventEdit) {
+ final String jsonData = extractNodeData(cmAvcEventEdit);
+ final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget());
+ final String parentXpath = getNormalizedParentXpath(cpsPathFromRestConfStylePath);
+ log.debug("Handling patch operation for cmHandleId : {} , cpsPath : {} and parent-xpath : {}", cmHandleId,
+ cpsPathFromRestConfStylePath, parentXpath);
+ cpsDataService.updateNodeLeaves(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
+ resolveParentNodeXpath(parentXpath), jsonData, NO_TIMESTAMP, JSON);
+
+ }
+
+ private void handleDelete(final String cmHandleId, final Edit cmAvcEventEdit) {
+ if (NO_PARENT_PATH.equals(cmAvcEventEdit.getTarget()) || cmAvcEventEdit.getTarget() == null) {
+ log.debug("Deleting all the entries for cmHandleId : {}", cmHandleId);
+ cpsDataService.deleteDataNodes(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, NO_TIMESTAMP);
+ } else {
+ final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget());
+ log.debug("Deleting data for xpath : {}", cpsPathFromRestConfStylePath);
+ cpsDataService.deleteDataNode(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId,
+ cpsPathFromRestConfStylePath, NO_TIMESTAMP);
+
+ }
+ }
+
+ private String extractNodeData(final Edit cmAvcEventEdit) {
+ return jsonObjectMapper.convertJsonString(jsonObjectMapper.asJsonString(cmAvcEventEdit.getValue()),
+ String.class);
+ }
+
+ private String getCpsPath(final String cmHandleId, final String restConfStylePath) {
+ log.debug("Getting cps path from the restconfpath : {}", restConfStylePath);
+ final Anchor anchor = cpsAnchorService.getAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId);
+ return yangParser.getCpsPathFromRestConfStylePath(anchor, restConfStylePath);
+ }
+
+ private String resolveParentNodeXpath(final String parentNodeXpath) {
+ return parentNodeXpath.isEmpty() ? CpsPathUtil.ROOT_NODE_XPATH : parentNodeXpath;
+ }
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+
+/*
+ Enum for valid Cm Avc Event operations.
+ */
+public enum CmAvcOperationEnum {
+
+ READ("read"),
+
+ CREATE("create"),
+
+ UPDATE("update"),
+
+ PATCH("patch"),
+
+ DELETE("delete");
+
+ private final String value;
+
+ CmAvcOperationEnum(final String value) {
+ this.value = value;
+ }
+
+ @JsonValue
+ public String getValue() {
+ return value;
+ }
+
+ /**
+ * Returns the Operation Enum.
+ *
+ * @param value string operation
+ * @return CmAvcOperationEnum
+ */
+ public static CmAvcOperationEnum fromValue(final String value) {
+ for (final CmAvcOperationEnum b : CmAvcOperationEnum.values()) {
+ if (b.value.equals(value)) {
+ return b;
+ }
+ }
+ throw new IllegalArgumentException("Unexpected value '" + value + "'");
+ }
+
+}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.cps.events.EventsProducer
+import org.onap.cps.ncmp.api.inventory.models.CompositeState
import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
-import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
import org.onap.cps.utils.JsonObjectMapper
import org.spockframework.spring.SpringBean
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
import org.testcontainers.spock.Testcontainers
+
+import java.nio.charset.Charset
import java.time.Duration
+import static org.onap.cps.ncmp.utils.TestUtils.getResourceFileContent
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
@SpringBootTest(classes = [EventsProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
@SpringBean
EventsProducer eventsProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ def mockCmAvcEventService = Mock(CmAvcEventService)
+ def mockInventoryPersistence = Mock(InventoryPersistence)
+
@SpringBean
- CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsProducer)
+ CmAvcEventConsumer cmAvcEventConsumer = new CmAvcEventConsumer(eventsProducer, mockCmAvcEventService, mockInventoryPersistence)
@Autowired
JsonObjectMapper jsonObjectMapper
- def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer))
-
- def 'Consume and forward valid message'() {
- given: 'consumer has a subscription on a topic'
- def cmEventsTopicName = 'cm-events'
- acvEventConsumer.cmEventsTopicName = cmEventsTopicName
- cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
- and: 'an event is sent'
- def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
- def testEventKey = 'sample-eventid-key'
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
- def testCloudEventSent = CloudEventBuilder.v1()
- .withData(jsonObjectMapper.asJsonBytes(testEventSent))
- .withId('sample-eventid')
- .withType('sample-test-type')
- .withSource(URI.create('sample-test-source'))
- .withExtension('correlationid', 'test-cmhandle1').build()
- and: 'event has header information'
- def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent)
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('group for Test A', CloudEventDeserializer))
+
+ def testAvcEvent
+ def testEventKey
+
+ def setup() {
+ testEventKey = 'sample-key'
+ testAvcEvent = jsonObjectMapper.convertJsonString(getResourceFileContent('sampleAvcInputEvent.json'), AvcEvent.class)
+ }
+
+ def 'Test A : Consume and forward valid message'() {
+ given: 'a cloud event'
+ def testCloudEventSent = buildCloudEvent('sample-source', 'test-cmhandle1')
+ and: 'consumer has a subscription on the target topic for this test'
+ cmAvcEventConsumer.cmEventsTopicName = 'target-topic-for-Test-A'
+ cloudEventKafkaConsumer.subscribe([cmAvcEventConsumer.cmEventsTopicName])
+ and: 'event is wrapped in a consumer record with message key(cmHandleId) and value as cloud event'
+ def consumerRecordReceived = new ConsumerRecord<String, CloudEvent>(cmAvcEventConsumer.cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent)
when: 'the event is consumed and forwarded to target topic'
- acvEventConsumer.consumeAndForward(consumerRecord)
- and: 'the target topic is polled'
- def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
- then: 'poll returns one record'
- assert records.size() == 1
- and: 'target record can be converted to AVC event'
- def record = records.iterator().next()
- def cloudEvent = record.value() as CloudEvent
- def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class)
+ cmAvcEventConsumer.consumeAndForward(consumerRecordReceived)
+ then: 'the consumer record can be read from the target topic within 2 seconds'
+ def consumerRecordOnTargetTopic = cloudEventKafkaConsumer.poll(Duration.ofMillis(2000)).iterator().next()
and: 'the target event has the same key as the source event to maintain the ordering in a partition'
- assert record.key() == consumerRecord.key()
+ def cloudEventFromTargetTopic = consumerRecordOnTargetTopic.value() as CloudEvent
+ def avcEventFromTargetTopic = toTargetEvent(cloudEventFromTargetTopic, AvcEvent.class)
+ assert consumerRecordOnTargetTopic.key() == consumerRecordReceived.key()
and: 'we have correct headers forwarded where correlation id matches'
- assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOnTargetTopic.headers(), 'ce_correlationid') == 'test-cmhandle1'
and: 'event id is same between consumed and forwarded'
- assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') == 'sample-eventid'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOnTargetTopic.headers(), 'ce_id') == 'sample-eventid'
and: 'the event payload still matches'
- assert testEventSent == convertedAvcEvent
+ assert avcEventFromTargetTopic == testAvcEvent
}
+ def 'Test B : Consume and process CM Avc Event when #scenario'() {
+ given: 'a cloud event is created(what we get from ONAP-DMI-PLUGIN)'
+ def sourceSystem = 'ONAP-DMI-PLUGIN'
+ def testCloudEventSent = buildCloudEvent(sourceSystem, 'some-cmhandle-id')
+ and: 'a separate topic for this test'
+ cmAvcEventConsumer.cmEventsTopicName = 'some-topic-for-Test-B'
+ and: 'inventory persistence service has #scenario'
+ def compositeState = new CompositeState(dataSyncEnabled: dataSyncFlag)
+ 1 * mockInventoryPersistence.getCmHandleState(_) >> compositeState
+ and: 'event has source system as ONAP-DMI-PLUGIN and key(cmHandleId) and value as cloud event'
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmAvcEventConsumer.cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent)
+ consumerRecord.headers().add('ce_source', sourceSystem.getBytes(Charset.defaultCharset()))
+ when: 'the event is consumed'
+ cmAvcEventConsumer.consumeAndForward(consumerRecord)
+ then: 'cm avc event is processed for updating the cached data'
+ expectedCallToProcessCmAvcEvent * mockCmAvcEventService.processCmAvcEvent(testEventKey, _) >> { args ->
+ {
+ assert args[1] instanceof AvcEvent
+ }
+ }
+ where: 'following scenarios are used'
+ scenario | dataSyncFlag || expectedCallToProcessCmAvcEvent
+ 'data sync flag enabled' | true || 1
+ 'data sync flag disabled' | false || 0
+
+ }
+
+ def buildCloudEvent(sourceSystem, cmHandleId){
+
+ return CloudEventBuilder.v1()
+ .withData(jsonObjectMapper.asJsonBytes(testAvcEvent))
+ .withId('sample-eventid')
+ .withType('sample-test-type')
+ .withSource(URI.create(sourceSystem as String))
+ .withExtension('correlationid', cmHandleId).build()
+
+ }
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.onap.cps.api.CpsAnchorService
+import org.onap.cps.api.CpsDataService
+import org.onap.cps.api.model.Anchor
+import org.onap.cps.cpspath.parser.CpsPathUtil
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
+import org.onap.cps.ncmp.events.avc1_0_0.Data
+import org.onap.cps.ncmp.events.avc1_0_0.DatastoreChanges
+import org.onap.cps.ncmp.events.avc1_0_0.Edit
+import org.onap.cps.ncmp.events.avc1_0_0.IetfYangPatchYangPatch
+import org.onap.cps.ncmp.events.avc1_0_0.PushChangeUpdate
+import org.onap.cps.utils.JsonObjectMapper
+import org.onap.cps.utils.YangParser
+import spock.lang.Specification
+
+class CmAvcEventServiceSpec extends Specification {
+
+ def mockCpsDataService = Mock(CpsDataService)
+ def mockCpsAnchorService = Mock(CpsAnchorService)
+ def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
+ def mockYangParser = Mock(YangParser)
+
+ def static NO_TIMESTAMP = null
+ def static NO_XPATH = ''
+
+ def objectUnderTest = new CmAvcEventService(
+ mockCpsDataService,
+ mockCpsAnchorService,
+ jsonObjectMapper,
+ mockYangParser
+ )
+
+ def cmHandleId = 'test-cmhandle-id'
+ def sampleJson = '{"some-data": "test-data"}'
+
+ def 'process CREATE operation'() {
+ given: 'An edit with CREATE operation'
+ def testAvcEventForCreate = testAvcEvent('create', NO_XPATH)
+ when: 'The AVC event is processed'
+ objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForCreate)
+ then: 'data is saved via cps service'
+ 1 * mockCpsDataService.saveData(_, cmHandleId, sampleJson, NO_TIMESTAMP)
+ }
+
+ def 'Process UPDATE operation'() {
+ given: 'An edit with UPDATE operation and a valid target path'
+ def targetPath = '/test/path'
+ def anchor = Mock(Anchor)
+ mockCpsAnchorService.getAnchor(_, cmHandleId) >> anchor
+ mockYangParser.getCpsPathFromRestConfStylePath(anchor, targetPath) >> '/parsed/cps/path'
+ def testAvcEventForUpdate = testAvcEvent('update', targetPath)
+ when: 'The AVC event is processed'
+ objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForUpdate)
+ then: 'Data node and descendants are updated via CPS service'
+ 1 * mockCpsDataService.updateDataNodeAndDescendants(_, cmHandleId, _, sampleJson, NO_TIMESTAMP, _)
+ }
+
+ def 'Process PATCH operation'() {
+ given: 'An edit with PATCH operation and a valid target path'
+ def targetPath = '/test/path'
+ def anchor = Mock(Anchor)
+ mockCpsAnchorService.getAnchor(_, cmHandleId) >> anchor
+ mockYangParser.getCpsPathFromRestConfStylePath(anchor, targetPath) >> '/parsed/cps/path'
+ def testAvcEventForPatch = testAvcEvent('patch', targetPath)
+ when: 'The AVC event is processed'
+ objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForPatch)
+ then: 'Node leaves are updated via CPS service'
+ 1 * mockCpsDataService.updateNodeLeaves(_, cmHandleId, _, sampleJson, NO_TIMESTAMP, _)
+ }
+
+ def 'Process DELETE operation with target'() {
+ given: 'An edit with DELETE operation and a specific target path'
+ def targetPath = '/test/path'
+ def anchor = Mock(Anchor)
+ mockCpsAnchorService.getAnchor(_, cmHandleId) >> anchor
+ mockYangParser.getCpsPathFromRestConfStylePath(anchor, targetPath) >> '/parsed/cps/path'
+ def testAvcEventForDelete = testAvcEvent('delete', targetPath)
+ when: 'The AVC event is processed'
+ objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForDelete)
+ then: 'Data node is deleted at the given path'
+ 1 * mockCpsDataService.deleteDataNode(_, cmHandleId, '/parsed/cps/path', NO_TIMESTAMP)
+ }
+
+ def 'Process DELETE operation with no target (delete all)'() {
+ given: 'An edit with DELETE operation and no target'
+ def testAvcEventForDelete = testAvcEvent('delete', NO_XPATH)
+ when: 'The AVC event is processed'
+ objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForDelete)
+ then: 'All data nodes for the cmHandleId are deleted'
+ 1 * mockCpsDataService.deleteDataNodes(_, cmHandleId, NO_TIMESTAMP)
+ where: 'following targets are used'
+ target << [null, '']
+ }
+
+ def 'Resolve parent xpath correctly: #scenario'() {
+ expect: 'Parent xpath is resolved as expected'
+ assert objectUnderTest.resolveParentNodeXpath(inputXpath) == expectedXpath
+ where: 'following scenarios are used'
+ scenario | inputXpath || expectedXpath
+ 'when parentXpath is empty' | '' || CpsPathUtil.ROOT_NODE_XPATH
+ 'when parentXpath is not empty' | '/test/path' || '/test/path'
+ }
+
+ def testAvcEvent(operation, targetXpath) {
+ new AvcEvent(
+ data: new Data(
+ pushChangeUpdate: new PushChangeUpdate(
+ datastoreChanges: new DatastoreChanges(
+ ietfYangPatchYangPatch: new IetfYangPatchYangPatch(
+ patchId: 'test-patch-id',
+ edit: [
+ new Edit(
+ operation: operation,
+ editId: 'test-edit-id',
+ target: targetXpath,
+ value: sampleJson
+ )
+ ]
+ )
+ )
+ )
+ )
+ )
+ }
+
+}
package org.onap.cps.utils;
+import static org.onap.cps.utils.RestConfStylePathToCpsPathUtil.convertToCpsPath;
import static org.onap.cps.utils.YangParserHelper.VALIDATE_AND_PARSE;
import static org.onap.cps.utils.YangParserHelper.VALIDATE_ONLY;
/**
* Parses data into (normalized) ContainerNode according to schema context for the given anchor.
*
- * @param nodeData data string
- * @param anchor the anchor for the node data
+ * @param nodeData data string
+ * @param anchor the anchor for the node data
* @return the NormalizedNode object
*/
@Timed(value = "cps.utils.yangparser.nodedata.with.parent.parse",
- description = "Time taken to parse node data with a parent")
- public ContainerNode parseData(final ContentType contentType,
- final String nodeData,
- final Anchor anchor,
- final String parentNodeXpath) {
+ description = "Time taken to parse node data with a parent")
+ public ContainerNode parseData(final ContentType contentType, final String nodeData, final Anchor anchor,
+ final String parentNodeXpath) {
final SchemaContext schemaContext = getSchemaContext(anchor);
try {
- return yangParserHelper
- .parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_AND_PARSE);
+ return yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath,
+ VALIDATE_AND_PARSE);
} catch (final DataValidationException e) {
invalidateCache(anchor);
}
/**
* Parses data into (normalized) ContainerNode according to schema context for the given yang resource.
*
- * @param nodeData data string
- * @param yangResourceContentPerName yang resource content per name
- * @return the NormalizedNode object
+ * @param nodeData data string
+ * @param yangResourceContentPerName yang resource content per name
+ * @return the NormalizedNode object
*/
@Timed(value = "cps.utils.yangparser.nodedata.with.parent.with.yangResourceMap.parse",
description = "Time taken to parse node data with a parent")
- public ContainerNode parseData(final ContentType contentType,
- final String nodeData,
- final Map<String, String> yangResourceContentPerName,
- final String parentNodeXpath) {
+ public ContainerNode parseData(final ContentType contentType, final String nodeData,
+ final Map<String, String> yangResourceContentPerName, final String parentNodeXpath) {
final SchemaContext schemaContext = getSchemaContext(yangResourceContentPerName);
return yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_AND_PARSE);
}
/**
* Parses data to validate it, using the schema context for given anchor.
*
- * @param anchor the anchor used for node data validation
- * @param parentNodeXpath the xpath of the parent node
- * @param nodeData JSON or XML data string to validate
- * @param contentType the content type of the data (e.g., JSON or XML)
- * @throws DataValidationException if validation fails
+ * @param anchor the anchor used for node data validation
+ * @param parentNodeXpath the xpath of the parent node
+ * @param nodeData JSON or XML data string to validate
+ * @param contentType the content type of the data (e.g., JSON or XML)
+ * @throws DataValidationException if validation fails
*/
- public void validateData(final ContentType contentType,
- final String nodeData,
- final Anchor anchor,
- final String parentNodeXpath) {
+ public void validateData(final ContentType contentType, final String nodeData, final Anchor anchor,
+ final String parentNodeXpath) {
final SchemaContext schemaContext = getSchemaContext(anchor);
try {
yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_ONLY);
yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_ONLY);
}
+ /**
+ * Get Cps path from Restconf path.
+ *
+ * @param anchor anchor
+ * @param restConfStylePath restconf path
+ * @return CpsPath
+ */
+ public String getCpsPathFromRestConfStylePath(final Anchor anchor, final String restConfStylePath) {
+ final SchemaContext schemaContext = getSchemaContext(anchor);
+ return convertToCpsPath(restConfStylePath, schemaContext);
+ }
+
private SchemaContext getSchemaContext(final Anchor anchor) {
- return yangTextSchemaSourceSetCache.get(anchor.getDataspaceName(),
- anchor.getSchemaSetName()).getSchemaContext();
+ return yangTextSchemaSourceSetCache.get(anchor.getDataspaceName(), anchor.getSchemaSetName())
+ .getSchemaContext();
}
private SchemaContext getSchemaContext(final Map<String, String> yangResourceContentPerName) {
return timedYangTextSchemaSourceSetBuilder.getYangTextSchemaSourceSet(yangResourceContentPerName)
- .getSchemaContext();
+ .getSchemaContext();
}
private void invalidateCache(final Anchor anchor) {
import org.onap.cps.TestUtils
import org.onap.cps.api.exceptions.DataValidationException
import org.onap.cps.api.model.Anchor
+import org.onap.cps.impl.YangTextSchemaSourceSetCache
import org.onap.cps.yang.TimedYangTextSchemaSourceSetBuilder
import org.onap.cps.yang.YangTextSchemaSourceSet
+import org.opendaylight.yangtools.yang.common.QName
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode
+import org.opendaylight.yangtools.yang.model.api.ListSchemaNode
import org.opendaylight.yangtools.yang.model.api.SchemaContext
import spock.lang.Specification
-import org.onap.cps.impl.YangTextSchemaSourceSetCache
class YangParserSpec extends Specification {
mockYangTextSchemaSourceSet.getSchemaContext() >> mockSchemaContext
}
+ def 'Convert RestConf-style path to CPS path'() {
+ given: 'a RestConf-style path'
+ def restConfStylePath = '/bookstore:book=Book1'
+ def expectedCpsPath = '/book[@name=\'Book1\']'
+ and: 'a schema context that contains the matching node'
+ def mockedBookNode = Mock(ListSchemaNode) {
+ getQName() >> QName.create('bookstore', 'book')
+ getKeyDefinition() >> [QName.create('bookstore', 'name')]
+ }
+ mockSchemaContext.getChildNodes() >> [mockedBookNode]
+ mockedBookNode.getChildNodes() >> []
+ when: 'restconf style path is converted to cps path'
+ def result = objectUnderTest.getCpsPathFromRestConfStylePath(anchor, restConfStylePath)
+ then: 'the schema context is retrieved from the cache for the correct anchor'
+ 1 * mockYangTextSchemaSourceSetCache.get('my dataspace', 'my schema') >> mockYangTextSchemaSourceSet
+ and: 'the CPS path is returned correctly'
+ assert result == expectedCpsPath
+ }
+
def 'Parsing data.'() {
given: 'the yang parser (utility) always returns a container node'
mockYangParserHelper.parseData(ContentType.JSON, 'some json', mockSchemaContext, noParent, validateAndParse) >> containerNodeFromYangUtils