From ca7272f782299872c7d3c4dab87434cce7772c67 Mon Sep 17 00:00:00 2001 From: mpriyank Date: Mon, 14 Jul 2025 10:32:23 +0100 Subject: [PATCH] Cm Avc Event for passthrough write operation - Cm Avc event is sent when there is a successful passthrough WRITE(create,update,delete) operation - This event will be used by NCMP to update the cache - Added testware for this scenario and also added a missing test for enabling data sync flag Issue-ID: CPS-2877 Change-Id: I4ceb196a4598dac2c2560decbec0ab8ce088c610 Signed-off-by: mpriyank --- .../ncmp/dmi/cmstack/avc/CmAvcEventService.java | 115 +++++++++++++++++++++ .../onap/cps/ncmp/dmi/service/DmiServiceImpl.java | 18 +++- dmi-service/src/main/resources/application.yml | 1 + .../dmi/cmstack/avc/CmAvcEventServiceSpec.groovy | 68 ++++++++++++ .../cps/ncmp/dmi/service/DmiServiceImplSpec.groovy | 35 +++++-- 5 files changed, 223 insertions(+), 14 deletions(-) create mode 100644 dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventService.java create mode 100644 dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventServiceSpec.groovy diff --git a/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventService.java b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventService.java new file mode 100644 index 00000000..659c893b --- /dev/null +++ b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventService.java @@ -0,0 +1,115 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.cmstack.avc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import java.net.URI; +import java.util.List; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.dmi.model.DataAccessRequest; +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent; +import org.onap.cps.ncmp.events.avc1_0_0.Data; +import org.onap.cps.ncmp.events.avc1_0_0.DatastoreChanges; +import org.onap.cps.ncmp.events.avc1_0_0.Edit; +import org.onap.cps.ncmp.events.avc1_0_0.IetfYangPatchYangPatch; +import org.onap.cps.ncmp.events.avc1_0_0.PushChangeUpdate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class CmAvcEventService { + + @Value("${app.dmi.avc.cm-avc-events-topic:dmi-cm-events}") + private String dmiCmAvcEventsTopic; + + private final ObjectMapper objectMapper; + private final KafkaTemplate cloudEventKafkaTemplate; + + /** + * Handle the event creation and sends event to NCMP. + * + * @param operation Operation + * @param cmHandle cm handle identifier + * @param resourceIdentifier resource identifier + * @param data actual data in the form of JSON + */ + public void sendCmAvcEvent(final DataAccessRequest.OperationEnum operation, final String cmHandle, + final String resourceIdentifier, final String data) { + + final CloudEvent cmAvcEventAsCloudEvent = + convertAvcEventToCloudEvent(cmAvcEventsCreator(operation, resourceIdentifier, data)); + + if (cmAvcEventAsCloudEvent == null) { + log.warn("cmAvcEventAsCloudEvent is null, skipping send for cmHandle: {}", cmHandle); + } else { + cloudEventKafkaTemplate.send(dmiCmAvcEventsTopic, cmHandle, cmAvcEventAsCloudEvent); + } + + } + + private AvcEvent cmAvcEventsCreator(final DataAccessRequest.OperationEnum operation, + final String resourceIdentifier, final String jsonData) { + final AvcEvent avcEvent = new AvcEvent(); + final Data data = new Data(); + final PushChangeUpdate pushChangeUpdate = new PushChangeUpdate(); + final DatastoreChanges datastoreChanges = new DatastoreChanges(); + final IetfYangPatchYangPatch ietfYangPatchYangPatch = new IetfYangPatchYangPatch(); + ietfYangPatchYangPatch.setPatchId(UUID.randomUUID().toString()); + final Edit edit = new Edit(); + edit.setEditId(UUID.randomUUID() + "-edit-id"); + edit.setOperation(operation.getValue()); + edit.setTarget(resourceIdentifier); + edit.setValue(jsonData); + ietfYangPatchYangPatch.setEdit(List.of(edit)); + datastoreChanges.setIetfYangPatchYangPatch(ietfYangPatchYangPatch); + pushChangeUpdate.setDatastoreChanges(datastoreChanges); + data.setPushChangeUpdate(pushChangeUpdate); + avcEvent.setData(data); + return avcEvent; + } + + private CloudEvent convertAvcEventToCloudEvent(final AvcEvent avcEvent) { + + try { + return CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("ONAP-DMI-PLUGIN")) + .withType(AvcEvent.class.getName()) + .withDataSchema(URI.create("urn:cps:" + AvcEvent.class.getName() + ":1.0.0")) + .withData(objectMapper.writeValueAsBytes(avcEvent)) + .build(); + } catch (final JsonProcessingException jsonProcessingException) { + log.error("Unable to convert object to json : {}", jsonProcessingException.getMessage()); + } + + return null; + + } + +} diff --git a/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java index 6bc6cee4..cfb06860 100644 --- a/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java +++ b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java @@ -30,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.dmi.cmstack.avc.CmAvcEventService; import org.onap.cps.ncmp.dmi.config.DmiPluginConfig.DmiPluginProperties; import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException; import org.onap.cps.ncmp.dmi.exception.DmiException; @@ -59,6 +60,7 @@ public class DmiServiceImpl implements DmiService { private NcmpRestClient ncmpRestClient; private ObjectMapper objectMapper; private DmiPluginProperties dmiPluginProperties; + private CmAvcEventService cmAvcEventService; /** * Constructor. @@ -67,14 +69,18 @@ public class DmiServiceImpl implements DmiService { * @param ncmpRestClient ncmpRestClient * @param sdncOperations sdncOperations * @param objectMapper objectMapper + * @param cmAvcEventService cmAvcEventService */ public DmiServiceImpl(final DmiPluginProperties dmiPluginProperties, - final NcmpRestClient ncmpRestClient, - final SdncOperations sdncOperations, final ObjectMapper objectMapper) { + final NcmpRestClient ncmpRestClient, + final SdncOperations sdncOperations, + final ObjectMapper objectMapper, + final CmAvcEventService cmAvcEventService) { this.dmiPluginProperties = dmiPluginProperties; this.ncmpRestClient = ncmpRestClient; this.objectMapper = objectMapper; this.sdncOperations = sdncOperations; + this.cmAvcEventService = cmAvcEventService; } @Override @@ -160,7 +166,7 @@ public class DmiServiceImpl implements DmiService { resourceIdentifier, optionsParamInQuery, restconfContentQueryParam); - return prepareAndSendResponse(responseEntity, cmHandle); + return checkDeviceResponse(responseEntity, cmHandle); } @Override @@ -170,10 +176,12 @@ public class DmiServiceImpl implements DmiService { final String dataType, final String data) { final ResponseEntity responseEntity = sdncOperations.writeData(operation, cmHandle, resourceIdentifier, dataType, data); - return prepareAndSendResponse(responseEntity, cmHandle); + final String deviceResponse = checkDeviceResponse(responseEntity, cmHandle); + cmAvcEventService.sendCmAvcEvent(operation, cmHandle, resourceIdentifier, data); + return deviceResponse; } - private String prepareAndSendResponse(final ResponseEntity responseEntity, final String cmHandle) { + private String checkDeviceResponse(final ResponseEntity responseEntity, final String cmHandle) { if (responseEntity.getStatusCode().is2xxSuccessful()) { return responseEntity.getBody(); } else { diff --git a/dmi-service/src/main/resources/application.yml b/dmi-service/src/main/resources/application.yml index 7ba9b256..99f739ab 100644 --- a/dmi-service/src/main/resources/application.yml +++ b/dmi-service/src/main/resources/application.yml @@ -76,6 +76,7 @@ app: avc: cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription} cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription} + cm-avc-events-topic: ${DMI_CM_AVC_EVENTS:dmi-cm-events} ves: topicNames: - "unauthenticated.VES_PNFREG_OUTPUT" diff --git a/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventServiceSpec.groovy b/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventServiceSpec.groovy new file mode 100644 index 00000000..052bd1dc --- /dev/null +++ b/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventServiceSpec.groovy @@ -0,0 +1,68 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.cmstack.avc + +import com.fasterxml.jackson.core.JsonProcessingException +import com.fasterxml.jackson.databind.ObjectMapper +import io.cloudevents.CloudEvent +import org.springframework.kafka.core.KafkaTemplate +import spock.lang.Specification + +import static org.onap.cps.ncmp.dmi.model.DataAccessRequest.OperationEnum.CREATE +import static org.onap.cps.ncmp.dmi.model.DataAccessRequest.OperationEnum.DELETE + +class CmAvcEventServiceSpec extends Specification { + + def mockKafkaTemplate = Mock(KafkaTemplate) + def mockObjectMapper = Mock(ObjectMapper) + def objectUnderTest = new CmAvcEventService(mockObjectMapper, mockKafkaTemplate) + + def setup() { + def dmiCmEventsTopicField = CmAvcEventService.getDeclaredField('dmiCmAvcEventsTopic') + dmiCmEventsTopicField.accessible = true + dmiCmEventsTopicField.set(objectUnderTest, 'test-topic') + } + + def 'Produce cm avc event in case of write operation'() { + given: 'Data that is sent for write operation' + def cmHandleId = 'my-cm-handle' + and: 'mocking successful serialization' + byte[] expectedBytes = [1, 2, 3] + mockObjectMapper.writeValueAsBytes(_ as Object) >> expectedBytes + when: 'the event is sent' + objectUnderTest.sendCmAvcEvent(CREATE, cmHandleId, '/my/resource/path', '{"data":"my data"}') + then: 'the event with correct details is send' + 1 * mockKafkaTemplate.send('test-topic', cmHandleId, { CloudEvent event -> + event.getSource().toString() == 'ONAP-DMI-PLUGIN' && + event.getType().contains('AvcEvent') && + event.getData().toBytes() == expectedBytes + }) + } + + def 'Event is not sent when cloudEvent is null'() { + given: 'mocking failed serialization to throw exception resulting in null CloudEvent' + mockObjectMapper.writeValueAsBytes(_ as Object) >> { throw new JsonProcessingException('failed') {} } + when: 'cm avc event is sent' + objectUnderTest.sendCmAvcEvent(DELETE, 'some-cm-handle', '/some/resource', '{"data":"some data"}') + then: 'kafka template is not invoked as event is null' + 0 * mockKafkaTemplate.send(*_) + } +} diff --git a/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy b/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy index 8531d35f..bed4f0be 100644 --- a/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy +++ b/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021-2022 Nordix Foundation + * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved. * Modifications Copyright (C) 2021-2022 Bell Canada * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -24,16 +24,17 @@ package org.onap.cps.ncmp.dmi.service import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectWriter +import org.onap.cps.ncmp.dmi.cmstack.avc.CmAvcEventService import org.onap.cps.ncmp.dmi.config.DmiPluginConfig import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException import org.onap.cps.ncmp.dmi.exception.DmiException +import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException -import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException -import org.onap.cps.ncmp.dmi.service.model.ModuleReference import org.onap.cps.ncmp.dmi.model.YangResource import org.onap.cps.ncmp.dmi.model.YangResources import org.onap.cps.ncmp.dmi.service.client.NcmpRestClient +import org.onap.cps.ncmp.dmi.service.model.ModuleReference import org.onap.cps.ncmp.dmi.service.model.ModuleSchema import org.onap.cps.ncmp.dmi.service.operation.SdncOperations import org.springframework.http.HttpStatus @@ -51,7 +52,8 @@ class DmiServiceImplSpec extends Specification { def spyObjectMapper = Spy(ObjectMapper) def mockObjectMapper = Mock(ObjectMapper) def mockSdncOperations = Mock(SdncOperations) - def objectUnderTest = new DmiServiceImpl(mockDmiPluginProperties, mockNcmpRestClient, mockSdncOperations, spyObjectMapper) + def mockCmAvcEventService = Mock(CmAvcEventService) + def objectUnderTest = new DmiServiceImpl(mockDmiPluginProperties, mockNcmpRestClient, mockSdncOperations, spyObjectMapper, mockCmAvcEventService) def 'Register cm handles with ncmp.'() { given: 'some cm-handle ids' @@ -252,13 +254,15 @@ class DmiServiceImplSpec extends Specification { def 'Write resource data with special characters.'() { given: 'sdnc returns a created response' - mockSdncOperations.writeData(CREATE, 'some-cmHandle', - 'some-resourceIdentifier', 'some-dataType', 'data with quote " and \n new line') >> new ResponseEntity('response json', HttpStatus.CREATED) + mockSdncOperations.writeData(CREATE, 'my-cmHandle', + 'my-resourceIdentifier', 'my-dataType', 'data with quote " and \n new line') >> new ResponseEntity('response json', HttpStatus.CREATED) when: 'resource data is written to sdnc' - def response = objectUnderTest.writeData(CREATE, 'some-cmHandle', - 'some-resourceIdentifier', 'some-dataType', 'data with quote " and \n new line') + def response = objectUnderTest.writeData(CREATE, 'my-cmHandle', + 'my-resourceIdentifier', 'my-dataType', 'data with quote " and \n new line') then: 'the response matches the expected data' - response == 'response json' + assert response == 'response json' + and: 'the cm avc event is sent to NCMP' + 1 * mockCmAvcEventService.sendCmAvcEvent(CREATE, 'my-cmHandle', 'my-resourceIdentifier', 'data with quote " and \n new line') } def 'Write resource data for passthrough running with a 500 response from sdnc.'() { @@ -269,5 +273,18 @@ class DmiServiceImplSpec extends Specification { 'some-resourceIdentifier', 'some-dataType', _ as String) then: 'a dmi exception is thrown' thrown(DmiException.class) + and : 'cm avc event is not sent' + 0 * mockCmAvcEventService.sendCmAvcEvent(*_) + } + + def 'Enabling data synchronization flag'() { + given: 'a list of cm handle ids' + def cmHandleIds = ['ch-1', 'ch-2'] + when: 'data sync is enabled for the cm handles' + objectUnderTest.enableNcmpDataSyncForCmHandles(cmHandleIds) + then: 'the data sync is enabled for each cm handle (over the REST interface)' + 1 * mockNcmpRestClient.enableNcmpDataSync('ch-1') + 1 * mockNcmpRestClient.enableNcmpDataSync('ch-2') + } } \ No newline at end of file -- 2.16.6