Cm Avc Event for passthrough write operation 27/141527/6
authormpriyank <priyank.maheshwari@est.tech>
Mon, 14 Jul 2025 09:32:23 +0000 (10:32 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Wed, 23 Jul 2025 09:27:06 +0000 (10:27 +0100)
- Cm Avc event is sent when there is a successful passthrough
  WRITE(create,update,delete) operation
- This event will be used by NCMP to update the cache
- Added testware for this scenario and also added a missing test for
  enabling data sync flag

Issue-ID: CPS-2877
Change-Id: I4ceb196a4598dac2c2560decbec0ab8ce088c610
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventService.java [new file with mode: 0644]
dmi-service/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java
dmi-service/src/main/resources/application.yml
dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventServiceSpec.groovy [new file with mode: 0644]
dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy

diff --git a/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventService.java b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventService.java
new file mode 100644 (file)
index 0000000..659c893
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.cmstack.avc;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.util.List;
+import java.util.UUID;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.model.DataAccessRequest;
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
+import org.onap.cps.ncmp.events.avc1_0_0.Data;
+import org.onap.cps.ncmp.events.avc1_0_0.DatastoreChanges;
+import org.onap.cps.ncmp.events.avc1_0_0.Edit;
+import org.onap.cps.ncmp.events.avc1_0_0.IetfYangPatchYangPatch;
+import org.onap.cps.ncmp.events.avc1_0_0.PushChangeUpdate;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class CmAvcEventService {
+
+    @Value("${app.dmi.avc.cm-avc-events-topic:dmi-cm-events}")
+    private String dmiCmAvcEventsTopic;
+
+    private final ObjectMapper objectMapper;
+    private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
+
+    /**
+     * Handle the event creation and sends event to NCMP.
+     *
+     * @param operation          Operation
+     * @param cmHandle           cm handle identifier
+     * @param resourceIdentifier resource identifier
+     * @param data               actual data in the form of JSON
+     */
+    public void sendCmAvcEvent(final DataAccessRequest.OperationEnum operation, final String cmHandle,
+            final String resourceIdentifier, final String data) {
+
+        final CloudEvent cmAvcEventAsCloudEvent =
+                convertAvcEventToCloudEvent(cmAvcEventsCreator(operation, resourceIdentifier, data));
+
+        if (cmAvcEventAsCloudEvent == null) {
+            log.warn("cmAvcEventAsCloudEvent is null, skipping send for cmHandle: {}", cmHandle);
+        } else {
+            cloudEventKafkaTemplate.send(dmiCmAvcEventsTopic, cmHandle, cmAvcEventAsCloudEvent);
+        }
+
+    }
+
+    private AvcEvent cmAvcEventsCreator(final DataAccessRequest.OperationEnum operation,
+            final String resourceIdentifier, final String jsonData) {
+        final AvcEvent avcEvent = new AvcEvent();
+        final Data data = new Data();
+        final PushChangeUpdate pushChangeUpdate = new PushChangeUpdate();
+        final DatastoreChanges datastoreChanges = new DatastoreChanges();
+        final IetfYangPatchYangPatch ietfYangPatchYangPatch = new IetfYangPatchYangPatch();
+        ietfYangPatchYangPatch.setPatchId(UUID.randomUUID().toString());
+        final Edit edit = new Edit();
+        edit.setEditId(UUID.randomUUID() + "-edit-id");
+        edit.setOperation(operation.getValue());
+        edit.setTarget(resourceIdentifier);
+        edit.setValue(jsonData);
+        ietfYangPatchYangPatch.setEdit(List.of(edit));
+        datastoreChanges.setIetfYangPatchYangPatch(ietfYangPatchYangPatch);
+        pushChangeUpdate.setDatastoreChanges(datastoreChanges);
+        data.setPushChangeUpdate(pushChangeUpdate);
+        avcEvent.setData(data);
+        return avcEvent;
+    }
+
+    private CloudEvent convertAvcEventToCloudEvent(final AvcEvent avcEvent) {
+
+        try {
+            return CloudEventBuilder.v1()
+                    .withId(UUID.randomUUID().toString())
+                    .withSource(URI.create("ONAP-DMI-PLUGIN"))
+                    .withType(AvcEvent.class.getName())
+                    .withDataSchema(URI.create("urn:cps:" + AvcEvent.class.getName() + ":1.0.0"))
+                    .withData(objectMapper.writeValueAsBytes(avcEvent))
+                    .build();
+        } catch (final JsonProcessingException jsonProcessingException) {
+            log.error("Unable to convert object to json : {}", jsonProcessingException.getMessage());
+        }
+
+        return null;
+
+    }
+
+}
index 6bc6cee..cfb0686 100644 (file)
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.cmstack.avc.CmAvcEventService;
 import org.onap.cps.ncmp.dmi.config.DmiPluginConfig.DmiPluginProperties;
 import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException;
 import org.onap.cps.ncmp.dmi.exception.DmiException;
@@ -59,6 +60,7 @@ public class DmiServiceImpl implements DmiService {
     private NcmpRestClient ncmpRestClient;
     private ObjectMapper objectMapper;
     private DmiPluginProperties dmiPluginProperties;
+    private CmAvcEventService cmAvcEventService;
 
     /**
      * Constructor.
@@ -67,14 +69,18 @@ public class DmiServiceImpl implements DmiService {
      * @param ncmpRestClient      ncmpRestClient
      * @param sdncOperations      sdncOperations
      * @param objectMapper        objectMapper
+     * @param cmAvcEventService   cmAvcEventService
      */
     public DmiServiceImpl(final DmiPluginProperties dmiPluginProperties,
-        final NcmpRestClient ncmpRestClient,
-        final SdncOperations sdncOperations, final ObjectMapper objectMapper) {
+                                    final NcmpRestClient ncmpRestClient,
+                                    final SdncOperations sdncOperations,
+                                    final ObjectMapper objectMapper,
+                                    final CmAvcEventService cmAvcEventService) {
         this.dmiPluginProperties = dmiPluginProperties;
         this.ncmpRestClient = ncmpRestClient;
         this.objectMapper = objectMapper;
         this.sdncOperations = sdncOperations;
+        this.cmAvcEventService = cmAvcEventService;
     }
 
     @Override
@@ -160,7 +166,7 @@ public class DmiServiceImpl implements DmiService {
             resourceIdentifier,
             optionsParamInQuery,
             restconfContentQueryParam);
-        return prepareAndSendResponse(responseEntity, cmHandle);
+        return checkDeviceResponse(responseEntity, cmHandle);
     }
 
     @Override
@@ -170,10 +176,12 @@ public class DmiServiceImpl implements DmiService {
                             final String dataType, final String data) {
         final ResponseEntity<String> responseEntity =
             sdncOperations.writeData(operation, cmHandle, resourceIdentifier, dataType, data);
-        return prepareAndSendResponse(responseEntity, cmHandle);
+        final String deviceResponse = checkDeviceResponse(responseEntity, cmHandle);
+        cmAvcEventService.sendCmAvcEvent(operation, cmHandle, resourceIdentifier, data);
+        return deviceResponse;
     }
 
-    private String prepareAndSendResponse(final ResponseEntity<String> responseEntity, final String cmHandle) {
+    private String checkDeviceResponse(final ResponseEntity<String> responseEntity, final String cmHandle) {
         if (responseEntity.getStatusCode().is2xxSuccessful()) {
             return responseEntity.getBody();
         } else {
index 7ba9b25..99f739a 100644 (file)
@@ -76,6 +76,7 @@ app:
     avc:
       cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
       cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
+      cm-avc-events-topic: ${DMI_CM_AVC_EVENTS:dmi-cm-events}
     ves:
       topicNames:
         - "unauthenticated.VES_PNFREG_OUTPUT"
diff --git a/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventServiceSpec.groovy b/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/avc/CmAvcEventServiceSpec.groovy
new file mode 100644 (file)
index 0000000..052bd1d
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.cmstack.avc
+
+import com.fasterxml.jackson.core.JsonProcessingException
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import org.springframework.kafka.core.KafkaTemplate
+import spock.lang.Specification
+
+import static org.onap.cps.ncmp.dmi.model.DataAccessRequest.OperationEnum.CREATE
+import static org.onap.cps.ncmp.dmi.model.DataAccessRequest.OperationEnum.DELETE
+
+class CmAvcEventServiceSpec extends Specification {
+
+    def mockKafkaTemplate = Mock(KafkaTemplate)
+    def mockObjectMapper = Mock(ObjectMapper)
+    def objectUnderTest = new CmAvcEventService(mockObjectMapper, mockKafkaTemplate)
+
+    def setup() {
+        def dmiCmEventsTopicField = CmAvcEventService.getDeclaredField('dmiCmAvcEventsTopic')
+        dmiCmEventsTopicField.accessible = true
+        dmiCmEventsTopicField.set(objectUnderTest, 'test-topic')
+    }
+
+    def 'Produce cm avc event in case of write operation'() {
+        given: 'Data that is sent for write operation'
+            def cmHandleId = 'my-cm-handle'
+        and: 'mocking successful serialization'
+            byte[] expectedBytes = [1, 2, 3]
+            mockObjectMapper.writeValueAsBytes(_ as Object) >> expectedBytes
+        when: 'the event is sent'
+            objectUnderTest.sendCmAvcEvent(CREATE, cmHandleId, '/my/resource/path', '{"data":"my data"}')
+        then: 'the event with correct details is send'
+            1 * mockKafkaTemplate.send('test-topic', cmHandleId, { CloudEvent event ->
+                event.getSource().toString() == 'ONAP-DMI-PLUGIN' &&
+                    event.getType().contains('AvcEvent') &&
+                    event.getData().toBytes() == expectedBytes
+            })
+    }
+
+    def 'Event is not sent when cloudEvent is null'() {
+        given: 'mocking failed serialization to throw exception resulting in null CloudEvent'
+            mockObjectMapper.writeValueAsBytes(_ as Object) >> { throw new JsonProcessingException('failed') {} }
+        when: 'cm avc event is sent'
+            objectUnderTest.sendCmAvcEvent(DELETE, 'some-cm-handle', '/some/resource', '{"data":"some data"}')
+        then: 'kafka template is not invoked as event is null'
+            0 * mockKafkaTemplate.send(*_)
+    }
+}
index 8531d35..bed4f0b 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2022 Nordix Foundation
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  *  Modifications Copyright (C) 2021-2022 Bell Canada
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,16 +24,17 @@ package org.onap.cps.ncmp.dmi.service
 import com.fasterxml.jackson.core.JsonProcessingException
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.databind.ObjectWriter
+import org.onap.cps.ncmp.dmi.cmstack.avc.CmAvcEventService
 import org.onap.cps.ncmp.dmi.config.DmiPluginConfig
 import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException
 import org.onap.cps.ncmp.dmi.exception.DmiException
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException
 import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException
 import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException
-import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException
-import org.onap.cps.ncmp.dmi.service.model.ModuleReference
 import org.onap.cps.ncmp.dmi.model.YangResource
 import org.onap.cps.ncmp.dmi.model.YangResources
 import org.onap.cps.ncmp.dmi.service.client.NcmpRestClient
+import org.onap.cps.ncmp.dmi.service.model.ModuleReference
 import org.onap.cps.ncmp.dmi.service.model.ModuleSchema
 import org.onap.cps.ncmp.dmi.service.operation.SdncOperations
 import org.springframework.http.HttpStatus
@@ -51,7 +52,8 @@ class DmiServiceImplSpec extends Specification {
     def spyObjectMapper = Spy(ObjectMapper)
     def mockObjectMapper = Mock(ObjectMapper)
     def mockSdncOperations = Mock(SdncOperations)
-    def objectUnderTest = new DmiServiceImpl(mockDmiPluginProperties, mockNcmpRestClient, mockSdncOperations, spyObjectMapper)
+    def mockCmAvcEventService = Mock(CmAvcEventService)
+    def objectUnderTest = new DmiServiceImpl(mockDmiPluginProperties, mockNcmpRestClient, mockSdncOperations, spyObjectMapper, mockCmAvcEventService)
 
     def 'Register cm handles with ncmp.'() {
         given: 'some cm-handle ids'
@@ -252,13 +254,15 @@ class DmiServiceImplSpec extends Specification {
 
     def 'Write resource data with special characters.'() {
         given: 'sdnc returns a created response'
-            mockSdncOperations.writeData(CREATE, 'some-cmHandle',
-                    'some-resourceIdentifier', 'some-dataType', 'data with quote " and \n new line') >> new ResponseEntity<String>('response json', HttpStatus.CREATED)
+            mockSdncOperations.writeData(CREATE, 'my-cmHandle',
+                    'my-resourceIdentifier', 'my-dataType', 'data with quote " and \n new line') >> new ResponseEntity<String>('response json', HttpStatus.CREATED)
         when: 'resource data is written to sdnc'
-            def response = objectUnderTest.writeData(CREATE, 'some-cmHandle',
-                    'some-resourceIdentifier', 'some-dataType', 'data with quote " and \n new line')
+            def response = objectUnderTest.writeData(CREATE, 'my-cmHandle',
+                    'my-resourceIdentifier', 'my-dataType', 'data with quote " and \n new line')
         then: 'the response matches the expected data'
-            response == 'response json'
+            assert response == 'response json'
+        and: 'the cm avc event is sent to NCMP'
+            1 * mockCmAvcEventService.sendCmAvcEvent(CREATE, 'my-cmHandle', 'my-resourceIdentifier', 'data with quote " and \n new line')
     }
 
     def 'Write resource data for passthrough running with a 500 response from sdnc.'() {
@@ -269,5 +273,18 @@ class DmiServiceImplSpec extends Specification {
                     'some-resourceIdentifier', 'some-dataType', _ as String)
         then: 'a dmi exception is thrown'
             thrown(DmiException.class)
+        and : 'cm avc event is not sent'
+            0 * mockCmAvcEventService.sendCmAvcEvent(*_)
+    }
+
+    def 'Enabling data synchronization flag'() {
+        given: 'a list of cm handle ids'
+            def cmHandleIds = ['ch-1', 'ch-2']
+        when: 'data sync is enabled for the cm handles'
+            objectUnderTest.enableNcmpDataSyncForCmHandles(cmHandleIds)
+        then: 'the data sync is enabled for each cm handle (over the REST interface)'
+            1 * mockNcmpRestClient.enableNcmpDataSync('ch-1')
+            1 * mockNcmpRestClient.enableNcmpDataSync('ch-2')
+
     }
 }
\ No newline at end of file