From f7255dd331914348f6d313cbc3e216f44b165922 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Mon, 21 Jul 2025 12:21:46 +0100 Subject: [PATCH] Process Cm Avc Event in NCMP - process the cm avc event when the source system is set as ONAP DMI Plugin in the header - handle the create , update , patch and delete operation to the CPS Cache. - Testware to support the write operation - Usage of the RestConf to Cps Path utility in this patch Issue-ID: CPS-2777 Change-Id: I43233cf52f411a5d3cc20b03d0ef6e6e50399aa7 Signed-off-by: mpriyank --- .../cmavc/CmAvcEventConsumer.java | 33 ++++- .../cmavc/CmAvcEventService.java | 151 +++++++++++++++++++++ .../cmavc/CmAvcOperationEnum.java | 66 +++++++++ .../cmavc/CmAvcEventConsumerSpec.groovy | 107 ++++++++++----- .../cmavc/CmAvcEventServiceSpec.groovy | 148 ++++++++++++++++++++ .../main/java/org/onap/cps/utils/YangParser.java | 63 +++++---- .../org/onap/cps/utils/YangParserSpec.groovy | 23 +++- 7 files changed, 526 insertions(+), 65 deletions(-) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventService.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcOperationEnum.java create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventServiceSpec.groovy diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java index c8aec3e9e4..f504207e3f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java @@ -20,12 +20,18 @@ package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc; +import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent; + import io.cloudevents.CloudEvent; +import io.cloudevents.kafka.impl.KafkaHeaders; import io.micrometer.core.annotation.Timed; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; import org.onap.cps.events.EventsProducer; +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent; +import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; @@ -41,25 +47,48 @@ import org.springframework.stereotype.Component; public class CmAvcEventConsumer { + private static final String CLOUD_EVENT_SOURCE_SYSTEM_HEADER_KEY = "ce_source"; + @Value("${app.ncmp.avc.cm-events-topic}") private String cmEventsTopicName; private final EventsProducer eventsProducer; + private final CmAvcEventService cmAvcEventService; + private final InventoryPersistence inventoryPersistence; /** * Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic. * The key from incoming record will be used as key for the target topic as well to preserve the message ordering. + * If event is coming from ONAP-DMI-PLUGIN then the event will also be processed by NCMP and the cps cache/database + * will be updated. * * @param cmAvcEventAsConsumerRecord Incoming raw consumer record */ @KafkaListener(topics = "${app.dmi.cm-events.topic}", containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") @Timed(value = "cps.ncmp.cmnotifications.consumeandforward", description = "Time taken to forward CM AVC events") - public void consumeAndForward( - final ConsumerRecord cmAvcEventAsConsumerRecord) { + public void consumeAndForward(final ConsumerRecord cmAvcEventAsConsumerRecord) { + if (isEventFromOnapDmiPlugin(cmAvcEventAsConsumerRecord.headers())) { + processCmAvcEventChanges(cmAvcEventAsConsumerRecord); + } final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value(); final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key(); log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent); eventsProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); } + + private void processCmAvcEventChanges(final ConsumerRecord cmAvcEventAsConsumerRecord) { + final String cmHandleId = cmAvcEventAsConsumerRecord.key(); + final Boolean dataSyncEnabled = inventoryPersistence.getCmHandleState(cmHandleId).getDataSyncEnabled(); + if (Boolean.TRUE.equals(dataSyncEnabled)) { + final AvcEvent cmAvcEvent = toTargetEvent(cmAvcEventAsConsumerRecord.value(), AvcEvent.class); + log.debug("Event to be processed to update the cache with cmHandleId : {}", cmHandleId); + cmAvcEventService.processCmAvcEvent(cmHandleId, cmAvcEvent); + } + } + + private boolean isEventFromOnapDmiPlugin(final Headers headers) { + final String sourceSystem = KafkaHeaders.getParsedKafkaHeader(headers, CLOUD_EVENT_SOURCE_SYSTEM_HEADER_KEY); + return sourceSystem != null && sourceSystem.equals("ONAP-DMI-PLUGIN"); + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventService.java new file mode 100644 index 0000000000..061fe56c8a --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventService.java @@ -0,0 +1,151 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc; + +import static org.onap.cps.cpspath.parser.CpsPathUtil.NO_PARENT_PATH; +import static org.onap.cps.cpspath.parser.CpsPathUtil.getNormalizedParentXpath; +import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME; +import static org.onap.cps.utils.ContentType.JSON; + +import java.time.OffsetDateTime; +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.api.CpsAnchorService; +import org.onap.cps.api.CpsDataService; +import org.onap.cps.api.model.Anchor; +import org.onap.cps.cpspath.parser.CpsPathUtil; +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent; +import org.onap.cps.ncmp.events.avc1_0_0.Edit; +import org.onap.cps.utils.JsonObjectMapper; +import org.onap.cps.utils.YangParser; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class CmAvcEventService { + + private static final OffsetDateTime NO_TIMESTAMP = null; + + private final CpsDataService cpsDataService; + private final CpsAnchorService cpsAnchorService; + private final JsonObjectMapper jsonObjectMapper; + private final YangParser yangParser; + + /** + * Process the incoming AvcEvent and apply the changes to the cache. + * + * @param cmHandleId cm handle identifier + * @param cmAvcEvent cm avc event + */ + public void processCmAvcEvent(final String cmHandleId, final AvcEvent cmAvcEvent) { + + final List edits = + cmAvcEvent.getData().getPushChangeUpdate().getDatastoreChanges().getIetfYangPatchYangPatch().getEdit(); + + edits.forEach( + edit -> handleCmAvcEventOperation(CmAvcOperationEnum.fromValue(edit.getOperation()), cmHandleId, edit)); + } + + private void handleCmAvcEventOperation(final CmAvcOperationEnum cmAvcOperation, final String cmHandleId, + final Edit cmAvcEventEdit) { + + log.info("Operation : {} requested for cmHandleId : {}", cmAvcOperation.getValue(), cmHandleId); + + switch (cmAvcOperation) { + case CREATE: + handleCreate(cmHandleId, cmAvcEventEdit); + break; + + case UPDATE: + handleUpdate(cmHandleId, cmAvcEventEdit); + break; + + case PATCH: + handlePatch(cmHandleId, cmAvcEventEdit); + break; + + case DELETE: + handleDelete(cmHandleId, cmAvcEventEdit); + break; + + default: + log.error("Unhandled operation : {} for cmHandleId : {}", cmAvcOperation, cmHandleId); + } + } + + private void handleCreate(final String cmHandleId, final Edit cmAvcEventEdit) { + log.debug("Handling create operation for cmHandleId : {}", cmHandleId); + final String jsonData = extractNodeData(cmAvcEventEdit); + cpsDataService.saveData(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, jsonData, NO_TIMESTAMP); + } + + private void handleUpdate(final String cmHandleId, final Edit cmAvcEventEdit) { + final String jsonData = extractNodeData(cmAvcEventEdit); + final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget()); + final String parentXpath = getNormalizedParentXpath(cpsPathFromRestConfStylePath); + log.debug("Handling update operation for cmHandleId : {} , cpsPath : {} and parent-xpath : {}", cmHandleId, + cpsPathFromRestConfStylePath, parentXpath); + cpsDataService.updateDataNodeAndDescendants(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, + resolveParentNodeXpath(parentXpath), jsonData, NO_TIMESTAMP, JSON); + } + + private void handlePatch(final String cmHandleId, final Edit cmAvcEventEdit) { + final String jsonData = extractNodeData(cmAvcEventEdit); + final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget()); + final String parentXpath = getNormalizedParentXpath(cpsPathFromRestConfStylePath); + log.debug("Handling patch operation for cmHandleId : {} , cpsPath : {} and parent-xpath : {}", cmHandleId, + cpsPathFromRestConfStylePath, parentXpath); + cpsDataService.updateNodeLeaves(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, + resolveParentNodeXpath(parentXpath), jsonData, NO_TIMESTAMP, JSON); + + } + + private void handleDelete(final String cmHandleId, final Edit cmAvcEventEdit) { + if (NO_PARENT_PATH.equals(cmAvcEventEdit.getTarget()) || cmAvcEventEdit.getTarget() == null) { + log.debug("Deleting all the entries for cmHandleId : {}", cmHandleId); + cpsDataService.deleteDataNodes(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, NO_TIMESTAMP); + } else { + final String cpsPathFromRestConfStylePath = getCpsPath(cmHandleId, cmAvcEventEdit.getTarget()); + log.debug("Deleting data for xpath : {}", cpsPathFromRestConfStylePath); + cpsDataService.deleteDataNode(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, + cpsPathFromRestConfStylePath, NO_TIMESTAMP); + + } + } + + private String extractNodeData(final Edit cmAvcEventEdit) { + return jsonObjectMapper.convertJsonString(jsonObjectMapper.asJsonString(cmAvcEventEdit.getValue()), + String.class); + } + + private String getCpsPath(final String cmHandleId, final String restConfStylePath) { + log.debug("Getting cps path from the restconfpath : {}", restConfStylePath); + final Anchor anchor = cpsAnchorService.getAnchor(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId); + return yangParser.getCpsPathFromRestConfStylePath(anchor, restConfStylePath); + } + + private String resolveParentNodeXpath(final String parentNodeXpath) { + return parentNodeXpath.isEmpty() ? CpsPathUtil.ROOT_NODE_XPATH : parentNodeXpath; + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcOperationEnum.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcOperationEnum.java new file mode 100644 index 0000000000..09c4921b7b --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcOperationEnum.java @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc; + +import com.fasterxml.jackson.annotation.JsonValue; + +/* + Enum for valid Cm Avc Event operations. + */ +public enum CmAvcOperationEnum { + + READ("read"), + + CREATE("create"), + + UPDATE("update"), + + PATCH("patch"), + + DELETE("delete"); + + private final String value; + + CmAvcOperationEnum(final String value) { + this.value = value; + } + + @JsonValue + public String getValue() { + return value; + } + + /** + * Returns the Operation Enum. + * + * @param value string operation + * @return CmAvcOperationEnum + */ + public static CmAvcOperationEnum fromValue(final String value) { + for (final CmAvcOperationEnum b : CmAvcOperationEnum.values()) { + if (b.value.equals(value)) { + return b; + } + } + throw new IllegalArgumentException("Unexpected value '" + value + "'"); + } + +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy index b0a8f20ccb..9cf3684832 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy @@ -28,8 +28,9 @@ import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.cps.events.EventsProducer +import org.onap.cps.ncmp.api.inventory.models.CompositeState import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent -import org.onap.cps.ncmp.utils.TestUtils +import org.onap.cps.ncmp.impl.inventory.InventoryPersistence import org.onap.cps.ncmp.utils.events.MessagingBaseSpec import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean @@ -37,8 +38,11 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext import org.testcontainers.spock.Testcontainers + +import java.nio.charset.Charset import java.time.Duration +import static org.onap.cps.ncmp.utils.TestUtils.getResourceFileContent import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent @SpringBootTest(classes = [EventsProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper]) @@ -49,49 +53,84 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec { @SpringBean EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + def mockCmAvcEventService = Mock(CmAvcEventService) + def mockInventoryPersistence = Mock(InventoryPersistence) + @SpringBean - CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsProducer) + CmAvcEventConsumer cmAvcEventConsumer = new CmAvcEventConsumer(eventsProducer, mockCmAvcEventService, mockInventoryPersistence) @Autowired JsonObjectMapper jsonObjectMapper - def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer)) - - def 'Consume and forward valid message'() { - given: 'consumer has a subscription on a topic' - def cmEventsTopicName = 'cm-events' - acvEventConsumer.cmEventsTopicName = cmEventsTopicName - cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List) - and: 'an event is sent' - def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') - def testEventKey = 'sample-eventid-key' - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) - def testCloudEventSent = CloudEventBuilder.v1() - .withData(jsonObjectMapper.asJsonBytes(testEventSent)) - .withId('sample-eventid') - .withType('sample-test-type') - .withSource(URI.create('sample-test-source')) - .withExtension('correlationid', 'test-cmhandle1').build() - and: 'event has header information' - def consumerRecord = new ConsumerRecord(cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent) + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('group for Test A', CloudEventDeserializer)) + + def testAvcEvent + def testEventKey + + def setup() { + testEventKey = 'sample-key' + testAvcEvent = jsonObjectMapper.convertJsonString(getResourceFileContent('sampleAvcInputEvent.json'), AvcEvent.class) + } + + def 'Test A : Consume and forward valid message'() { + given: 'a cloud event' + def testCloudEventSent = buildCloudEvent('sample-source', 'test-cmhandle1') + and: 'consumer has a subscription on the target topic for this test' + cmAvcEventConsumer.cmEventsTopicName = 'target-topic-for-Test-A' + cloudEventKafkaConsumer.subscribe([cmAvcEventConsumer.cmEventsTopicName]) + and: 'event is wrapped in a consumer record with message key(cmHandleId) and value as cloud event' + def consumerRecordReceived = new ConsumerRecord(cmAvcEventConsumer.cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent) when: 'the event is consumed and forwarded to target topic' - acvEventConsumer.consumeAndForward(consumerRecord) - and: 'the target topic is polled' - def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)) - then: 'poll returns one record' - assert records.size() == 1 - and: 'target record can be converted to AVC event' - def record = records.iterator().next() - def cloudEvent = record.value() as CloudEvent - def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class) + cmAvcEventConsumer.consumeAndForward(consumerRecordReceived) + then: 'the consumer record can be read from the target topic within 2 seconds' + def consumerRecordOnTargetTopic = cloudEventKafkaConsumer.poll(Duration.ofMillis(2000)).iterator().next() and: 'the target event has the same key as the source event to maintain the ordering in a partition' - assert record.key() == consumerRecord.key() + def cloudEventFromTargetTopic = consumerRecordOnTargetTopic.value() as CloudEvent + def avcEventFromTargetTopic = toTargetEvent(cloudEventFromTargetTopic, AvcEvent.class) + assert consumerRecordOnTargetTopic.key() == consumerRecordReceived.key() and: 'we have correct headers forwarded where correlation id matches' - assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1' + assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOnTargetTopic.headers(), 'ce_correlationid') == 'test-cmhandle1' and: 'event id is same between consumed and forwarded' - assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') == 'sample-eventid' + assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOnTargetTopic.headers(), 'ce_id') == 'sample-eventid' and: 'the event payload still matches' - assert testEventSent == convertedAvcEvent + assert avcEventFromTargetTopic == testAvcEvent } + def 'Test B : Consume and process CM Avc Event when #scenario'() { + given: 'a cloud event is created(what we get from ONAP-DMI-PLUGIN)' + def sourceSystem = 'ONAP-DMI-PLUGIN' + def testCloudEventSent = buildCloudEvent(sourceSystem, 'some-cmhandle-id') + and: 'a separate topic for this test' + cmAvcEventConsumer.cmEventsTopicName = 'some-topic-for-Test-B' + and: 'inventory persistence service has #scenario' + def compositeState = new CompositeState(dataSyncEnabled: dataSyncFlag) + 1 * mockInventoryPersistence.getCmHandleState(_) >> compositeState + and: 'event has source system as ONAP-DMI-PLUGIN and key(cmHandleId) and value as cloud event' + def consumerRecord = new ConsumerRecord(cmAvcEventConsumer.cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent) + consumerRecord.headers().add('ce_source', sourceSystem.getBytes(Charset.defaultCharset())) + when: 'the event is consumed' + cmAvcEventConsumer.consumeAndForward(consumerRecord) + then: 'cm avc event is processed for updating the cached data' + expectedCallToProcessCmAvcEvent * mockCmAvcEventService.processCmAvcEvent(testEventKey, _) >> { args -> + { + assert args[1] instanceof AvcEvent + } + } + where: 'following scenarios are used' + scenario | dataSyncFlag || expectedCallToProcessCmAvcEvent + 'data sync flag enabled' | true || 1 + 'data sync flag disabled' | false || 0 + + } + + def buildCloudEvent(sourceSystem, cmHandleId){ + + return CloudEventBuilder.v1() + .withData(jsonObjectMapper.asJsonBytes(testAvcEvent)) + .withId('sample-eventid') + .withType('sample-test-type') + .withSource(URI.create(sourceSystem as String)) + .withExtension('correlationid', cmHandleId).build() + + } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventServiceSpec.groovy new file mode 100644 index 0000000000..3822ba3f95 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventServiceSpec.groovy @@ -0,0 +1,148 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc + +import com.fasterxml.jackson.databind.ObjectMapper +import org.onap.cps.api.CpsAnchorService +import org.onap.cps.api.CpsDataService +import org.onap.cps.api.model.Anchor +import org.onap.cps.cpspath.parser.CpsPathUtil +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent +import org.onap.cps.ncmp.events.avc1_0_0.Data +import org.onap.cps.ncmp.events.avc1_0_0.DatastoreChanges +import org.onap.cps.ncmp.events.avc1_0_0.Edit +import org.onap.cps.ncmp.events.avc1_0_0.IetfYangPatchYangPatch +import org.onap.cps.ncmp.events.avc1_0_0.PushChangeUpdate +import org.onap.cps.utils.JsonObjectMapper +import org.onap.cps.utils.YangParser +import spock.lang.Specification + +class CmAvcEventServiceSpec extends Specification { + + def mockCpsDataService = Mock(CpsDataService) + def mockCpsAnchorService = Mock(CpsAnchorService) + def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) + def mockYangParser = Mock(YangParser) + + def static NO_TIMESTAMP = null + def static NO_XPATH = '' + + def objectUnderTest = new CmAvcEventService( + mockCpsDataService, + mockCpsAnchorService, + jsonObjectMapper, + mockYangParser + ) + + def cmHandleId = 'test-cmhandle-id' + def sampleJson = '{"some-data": "test-data"}' + + def 'process CREATE operation'() { + given: 'An edit with CREATE operation' + def testAvcEventForCreate = testAvcEvent('create', NO_XPATH) + when: 'The AVC event is processed' + objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForCreate) + then: 'data is saved via cps service' + 1 * mockCpsDataService.saveData(_, cmHandleId, sampleJson, NO_TIMESTAMP) + } + + def 'Process UPDATE operation'() { + given: 'An edit with UPDATE operation and a valid target path' + def targetPath = '/test/path' + def anchor = Mock(Anchor) + mockCpsAnchorService.getAnchor(_, cmHandleId) >> anchor + mockYangParser.getCpsPathFromRestConfStylePath(anchor, targetPath) >> '/parsed/cps/path' + def testAvcEventForUpdate = testAvcEvent('update', targetPath) + when: 'The AVC event is processed' + objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForUpdate) + then: 'Data node and descendants are updated via CPS service' + 1 * mockCpsDataService.updateDataNodeAndDescendants(_, cmHandleId, _, sampleJson, NO_TIMESTAMP, _) + } + + def 'Process PATCH operation'() { + given: 'An edit with PATCH operation and a valid target path' + def targetPath = '/test/path' + def anchor = Mock(Anchor) + mockCpsAnchorService.getAnchor(_, cmHandleId) >> anchor + mockYangParser.getCpsPathFromRestConfStylePath(anchor, targetPath) >> '/parsed/cps/path' + def testAvcEventForPatch = testAvcEvent('patch', targetPath) + when: 'The AVC event is processed' + objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForPatch) + then: 'Node leaves are updated via CPS service' + 1 * mockCpsDataService.updateNodeLeaves(_, cmHandleId, _, sampleJson, NO_TIMESTAMP, _) + } + + def 'Process DELETE operation with target'() { + given: 'An edit with DELETE operation and a specific target path' + def targetPath = '/test/path' + def anchor = Mock(Anchor) + mockCpsAnchorService.getAnchor(_, cmHandleId) >> anchor + mockYangParser.getCpsPathFromRestConfStylePath(anchor, targetPath) >> '/parsed/cps/path' + def testAvcEventForDelete = testAvcEvent('delete', targetPath) + when: 'The AVC event is processed' + objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForDelete) + then: 'Data node is deleted at the given path' + 1 * mockCpsDataService.deleteDataNode(_, cmHandleId, '/parsed/cps/path', NO_TIMESTAMP) + } + + def 'Process DELETE operation with no target (delete all)'() { + given: 'An edit with DELETE operation and no target' + def testAvcEventForDelete = testAvcEvent('delete', NO_XPATH) + when: 'The AVC event is processed' + objectUnderTest.processCmAvcEvent(cmHandleId, testAvcEventForDelete) + then: 'All data nodes for the cmHandleId are deleted' + 1 * mockCpsDataService.deleteDataNodes(_, cmHandleId, NO_TIMESTAMP) + where: 'following targets are used' + target << [null, ''] + } + + def 'Resolve parent xpath correctly: #scenario'() { + expect: 'Parent xpath is resolved as expected' + assert objectUnderTest.resolveParentNodeXpath(inputXpath) == expectedXpath + where: 'following scenarios are used' + scenario | inputXpath || expectedXpath + 'when parentXpath is empty' | '' || CpsPathUtil.ROOT_NODE_XPATH + 'when parentXpath is not empty' | '/test/path' || '/test/path' + } + + def testAvcEvent(operation, targetXpath) { + new AvcEvent( + data: new Data( + pushChangeUpdate: new PushChangeUpdate( + datastoreChanges: new DatastoreChanges( + ietfYangPatchYangPatch: new IetfYangPatchYangPatch( + patchId: 'test-patch-id', + edit: [ + new Edit( + operation: operation, + editId: 'test-edit-id', + target: targetXpath, + value: sampleJson + ) + ] + ) + ) + ) + ) + ) + } + +} diff --git a/cps-service/src/main/java/org/onap/cps/utils/YangParser.java b/cps-service/src/main/java/org/onap/cps/utils/YangParser.java index 5dfeb2fb3f..a0846a7d7b 100644 --- a/cps-service/src/main/java/org/onap/cps/utils/YangParser.java +++ b/cps-service/src/main/java/org/onap/cps/utils/YangParser.java @@ -21,6 +21,7 @@ package org.onap.cps.utils; +import static org.onap.cps.utils.RestConfStylePathToCpsPathUtil.convertToCpsPath; import static org.onap.cps.utils.YangParserHelper.VALIDATE_AND_PARSE; import static org.onap.cps.utils.YangParserHelper.VALIDATE_ONLY; @@ -48,20 +49,18 @@ public class YangParser { /** * Parses data into (normalized) ContainerNode according to schema context for the given anchor. * - * @param nodeData data string - * @param anchor the anchor for the node data + * @param nodeData data string + * @param anchor the anchor for the node data * @return the NormalizedNode object */ @Timed(value = "cps.utils.yangparser.nodedata.with.parent.parse", - description = "Time taken to parse node data with a parent") - public ContainerNode parseData(final ContentType contentType, - final String nodeData, - final Anchor anchor, - final String parentNodeXpath) { + description = "Time taken to parse node data with a parent") + public ContainerNode parseData(final ContentType contentType, final String nodeData, final Anchor anchor, + final String parentNodeXpath) { final SchemaContext schemaContext = getSchemaContext(anchor); try { - return yangParserHelper - .parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_AND_PARSE); + return yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath, + VALIDATE_AND_PARSE); } catch (final DataValidationException e) { invalidateCache(anchor); } @@ -71,16 +70,14 @@ public class YangParser { /** * Parses data into (normalized) ContainerNode according to schema context for the given yang resource. * - * @param nodeData data string - * @param yangResourceContentPerName yang resource content per name - * @return the NormalizedNode object + * @param nodeData data string + * @param yangResourceContentPerName yang resource content per name + * @return the NormalizedNode object */ @Timed(value = "cps.utils.yangparser.nodedata.with.parent.with.yangResourceMap.parse", description = "Time taken to parse node data with a parent") - public ContainerNode parseData(final ContentType contentType, - final String nodeData, - final Map yangResourceContentPerName, - final String parentNodeXpath) { + public ContainerNode parseData(final ContentType contentType, final String nodeData, + final Map yangResourceContentPerName, final String parentNodeXpath) { final SchemaContext schemaContext = getSchemaContext(yangResourceContentPerName); return yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_AND_PARSE); } @@ -88,16 +85,14 @@ public class YangParser { /** * Parses data to validate it, using the schema context for given anchor. * - * @param anchor the anchor used for node data validation - * @param parentNodeXpath the xpath of the parent node - * @param nodeData JSON or XML data string to validate - * @param contentType the content type of the data (e.g., JSON or XML) - * @throws DataValidationException if validation fails + * @param anchor the anchor used for node data validation + * @param parentNodeXpath the xpath of the parent node + * @param nodeData JSON or XML data string to validate + * @param contentType the content type of the data (e.g., JSON or XML) + * @throws DataValidationException if validation fails */ - public void validateData(final ContentType contentType, - final String nodeData, - final Anchor anchor, - final String parentNodeXpath) { + public void validateData(final ContentType contentType, final String nodeData, final Anchor anchor, + final String parentNodeXpath) { final SchemaContext schemaContext = getSchemaContext(anchor); try { yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_ONLY); @@ -109,14 +104,26 @@ public class YangParser { yangParserHelper.parseData(contentType, nodeData, schemaContext, parentNodeXpath, VALIDATE_ONLY); } + /** + * Get Cps path from Restconf path. + * + * @param anchor anchor + * @param restConfStylePath restconf path + * @return CpsPath + */ + public String getCpsPathFromRestConfStylePath(final Anchor anchor, final String restConfStylePath) { + final SchemaContext schemaContext = getSchemaContext(anchor); + return convertToCpsPath(restConfStylePath, schemaContext); + } + private SchemaContext getSchemaContext(final Anchor anchor) { - return yangTextSchemaSourceSetCache.get(anchor.getDataspaceName(), - anchor.getSchemaSetName()).getSchemaContext(); + return yangTextSchemaSourceSetCache.get(anchor.getDataspaceName(), anchor.getSchemaSetName()) + .getSchemaContext(); } private SchemaContext getSchemaContext(final Map yangResourceContentPerName) { return timedYangTextSchemaSourceSetBuilder.getYangTextSchemaSourceSet(yangResourceContentPerName) - .getSchemaContext(); + .getSchemaContext(); } private void invalidateCache(final Anchor anchor) { diff --git a/cps-service/src/test/groovy/org/onap/cps/utils/YangParserSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/utils/YangParserSpec.groovy index cb7a16cb84..a1a0b8cef1 100644 --- a/cps-service/src/test/groovy/org/onap/cps/utils/YangParserSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/utils/YangParserSpec.groovy @@ -24,12 +24,14 @@ package org.onap.cps.utils import org.onap.cps.TestUtils import org.onap.cps.api.exceptions.DataValidationException import org.onap.cps.api.model.Anchor +import org.onap.cps.impl.YangTextSchemaSourceSetCache import org.onap.cps.yang.TimedYangTextSchemaSourceSetBuilder import org.onap.cps.yang.YangTextSchemaSourceSet +import org.opendaylight.yangtools.yang.common.QName import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode +import org.opendaylight.yangtools.yang.model.api.ListSchemaNode import org.opendaylight.yangtools.yang.model.api.SchemaContext import spock.lang.Specification -import org.onap.cps.impl.YangTextSchemaSourceSetCache class YangParserSpec extends Specification { @@ -54,6 +56,25 @@ class YangParserSpec extends Specification { mockYangTextSchemaSourceSet.getSchemaContext() >> mockSchemaContext } + def 'Convert RestConf-style path to CPS path'() { + given: 'a RestConf-style path' + def restConfStylePath = '/bookstore:book=Book1' + def expectedCpsPath = '/book[@name=\'Book1\']' + and: 'a schema context that contains the matching node' + def mockedBookNode = Mock(ListSchemaNode) { + getQName() >> QName.create('bookstore', 'book') + getKeyDefinition() >> [QName.create('bookstore', 'name')] + } + mockSchemaContext.getChildNodes() >> [mockedBookNode] + mockedBookNode.getChildNodes() >> [] + when: 'restconf style path is converted to cps path' + def result = objectUnderTest.getCpsPathFromRestConfStylePath(anchor, restConfStylePath) + then: 'the schema context is retrieved from the cache for the correct anchor' + 1 * mockYangTextSchemaSourceSetCache.get('my dataspace', 'my schema') >> mockYangTextSchemaSourceSet + and: 'the CPS path is returned correctly' + assert result == expectedCpsPath + } + def 'Parsing data.'() { given: 'the yang parser (utility) always returns a container node' mockYangParserHelper.parseData(ContentType.JSON, 'some json', mockSchemaContext, noParent, validateAndParse) >> containerNodeFromYangUtils -- 2.16.6