--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.cmstack.avc;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.util.List;
+import java.util.UUID;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.model.DataAccessRequest;
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
+import org.onap.cps.ncmp.events.avc1_0_0.Data;
+import org.onap.cps.ncmp.events.avc1_0_0.DatastoreChanges;
+import org.onap.cps.ncmp.events.avc1_0_0.Edit;
+import org.onap.cps.ncmp.events.avc1_0_0.IetfYangPatchYangPatch;
+import org.onap.cps.ncmp.events.avc1_0_0.PushChangeUpdate;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class CmAvcEventService {
+
+ @Value("${app.dmi.avc.cm-avc-events-topic:dmi-cm-events}")
+ private String dmiCmAvcEventsTopic;
+
+ private final ObjectMapper objectMapper;
+ private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
+
+ /**
+ * Handle the event creation and sends event to NCMP.
+ *
+ * @param operation Operation
+ * @param cmHandle cm handle identifier
+ * @param resourceIdentifier resource identifier
+ * @param data actual data in the form of JSON
+ */
+ public void sendCmAvcEvent(final DataAccessRequest.OperationEnum operation, final String cmHandle,
+ final String resourceIdentifier, final String data) {
+
+ final CloudEvent cmAvcEventAsCloudEvent =
+ convertAvcEventToCloudEvent(cmAvcEventsCreator(operation, resourceIdentifier, data));
+
+ if (cmAvcEventAsCloudEvent == null) {
+ log.warn("cmAvcEventAsCloudEvent is null, skipping send for cmHandle: {}", cmHandle);
+ } else {
+ cloudEventKafkaTemplate.send(dmiCmAvcEventsTopic, cmHandle, cmAvcEventAsCloudEvent);
+ }
+
+ }
+
+ private AvcEvent cmAvcEventsCreator(final DataAccessRequest.OperationEnum operation,
+ final String resourceIdentifier, final String jsonData) {
+ final AvcEvent avcEvent = new AvcEvent();
+ final Data data = new Data();
+ final PushChangeUpdate pushChangeUpdate = new PushChangeUpdate();
+ final DatastoreChanges datastoreChanges = new DatastoreChanges();
+ final IetfYangPatchYangPatch ietfYangPatchYangPatch = new IetfYangPatchYangPatch();
+ ietfYangPatchYangPatch.setPatchId(UUID.randomUUID().toString());
+ final Edit edit = new Edit();
+ edit.setEditId(UUID.randomUUID() + "-edit-id");
+ edit.setOperation(operation.getValue());
+ edit.setTarget(resourceIdentifier);
+ edit.setValue(jsonData);
+ ietfYangPatchYangPatch.setEdit(List.of(edit));
+ datastoreChanges.setIetfYangPatchYangPatch(ietfYangPatchYangPatch);
+ pushChangeUpdate.setDatastoreChanges(datastoreChanges);
+ data.setPushChangeUpdate(pushChangeUpdate);
+ avcEvent.setData(data);
+ return avcEvent;
+ }
+
+ private CloudEvent convertAvcEventToCloudEvent(final AvcEvent avcEvent) {
+
+ try {
+ return CloudEventBuilder.v1()
+ .withId(UUID.randomUUID().toString())
+ .withSource(URI.create("ONAP-DMI-PLUGIN"))
+ .withType(AvcEvent.class.getName())
+ .withDataSchema(URI.create("urn:cps:" + AvcEvent.class.getName() + ":1.0.0"))
+ .withData(objectMapper.writeValueAsBytes(avcEvent))
+ .build();
+ } catch (final JsonProcessingException jsonProcessingException) {
+ log.error("Unable to convert object to json : {}", jsonProcessingException.getMessage());
+ }
+
+ return null;
+
+ }
+
+}
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.cmstack.avc.CmAvcEventService;
import org.onap.cps.ncmp.dmi.config.DmiPluginConfig.DmiPluginProperties;
import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException;
import org.onap.cps.ncmp.dmi.exception.DmiException;
private NcmpRestClient ncmpRestClient;
private ObjectMapper objectMapper;
private DmiPluginProperties dmiPluginProperties;
+ private CmAvcEventService cmAvcEventService;
/**
* Constructor.
* @param ncmpRestClient ncmpRestClient
* @param sdncOperations sdncOperations
* @param objectMapper objectMapper
+ * @param cmAvcEventService cmAvcEventService
*/
public DmiServiceImpl(final DmiPluginProperties dmiPluginProperties,
- final NcmpRestClient ncmpRestClient,
- final SdncOperations sdncOperations, final ObjectMapper objectMapper) {
+ final NcmpRestClient ncmpRestClient,
+ final SdncOperations sdncOperations,
+ final ObjectMapper objectMapper,
+ final CmAvcEventService cmAvcEventService) {
this.dmiPluginProperties = dmiPluginProperties;
this.ncmpRestClient = ncmpRestClient;
this.objectMapper = objectMapper;
this.sdncOperations = sdncOperations;
+ this.cmAvcEventService = cmAvcEventService;
}
@Override
resourceIdentifier,
optionsParamInQuery,
restconfContentQueryParam);
- return prepareAndSendResponse(responseEntity, cmHandle);
+ return checkDeviceResponse(responseEntity, cmHandle);
}
@Override
final String dataType, final String data) {
final ResponseEntity<String> responseEntity =
sdncOperations.writeData(operation, cmHandle, resourceIdentifier, dataType, data);
- return prepareAndSendResponse(responseEntity, cmHandle);
+ final String deviceResponse = checkDeviceResponse(responseEntity, cmHandle);
+ cmAvcEventService.sendCmAvcEvent(operation, cmHandle, resourceIdentifier, data);
+ return deviceResponse;
}
- private String prepareAndSendResponse(final ResponseEntity<String> responseEntity, final String cmHandle) {
+ private String checkDeviceResponse(final ResponseEntity<String> responseEntity, final String cmHandle) {
if (responseEntity.getStatusCode().is2xxSuccessful()) {
return responseEntity.getBody();
} else {
avc:
cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
+ cm-avc-events-topic: ${DMI_CM_AVC_EVENTS:dmi-cm-events}
ves:
topicNames:
- "unauthenticated.VES_PNFREG_OUTPUT"
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.cmstack.avc
+
+import com.fasterxml.jackson.core.JsonProcessingException
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import org.springframework.kafka.core.KafkaTemplate
+import spock.lang.Specification
+
+import static org.onap.cps.ncmp.dmi.model.DataAccessRequest.OperationEnum.CREATE
+import static org.onap.cps.ncmp.dmi.model.DataAccessRequest.OperationEnum.DELETE
+
+class CmAvcEventServiceSpec extends Specification {
+
+ def mockKafkaTemplate = Mock(KafkaTemplate)
+ def mockObjectMapper = Mock(ObjectMapper)
+ def objectUnderTest = new CmAvcEventService(mockObjectMapper, mockKafkaTemplate)
+
+ def setup() {
+ def dmiCmEventsTopicField = CmAvcEventService.getDeclaredField('dmiCmAvcEventsTopic')
+ dmiCmEventsTopicField.accessible = true
+ dmiCmEventsTopicField.set(objectUnderTest, 'test-topic')
+ }
+
+ def 'Produce cm avc event in case of write operation'() {
+ given: 'Data that is sent for write operation'
+ def cmHandleId = 'my-cm-handle'
+ and: 'mocking successful serialization'
+ byte[] expectedBytes = [1, 2, 3]
+ mockObjectMapper.writeValueAsBytes(_ as Object) >> expectedBytes
+ when: 'the event is sent'
+ objectUnderTest.sendCmAvcEvent(CREATE, cmHandleId, '/my/resource/path', '{"data":"my data"}')
+ then: 'the event with correct details is send'
+ 1 * mockKafkaTemplate.send('test-topic', cmHandleId, { CloudEvent event ->
+ event.getSource().toString() == 'ONAP-DMI-PLUGIN' &&
+ event.getType().contains('AvcEvent') &&
+ event.getData().toBytes() == expectedBytes
+ })
+ }
+
+ def 'Event is not sent when cloudEvent is null'() {
+ given: 'mocking failed serialization to throw exception resulting in null CloudEvent'
+ mockObjectMapper.writeValueAsBytes(_ as Object) >> { throw new JsonProcessingException('failed') {} }
+ when: 'cm avc event is sent'
+ objectUnderTest.sendCmAvcEvent(DELETE, 'some-cm-handle', '/some/resource', '{"data":"some data"}')
+ then: 'kafka template is not invoked as event is null'
+ 0 * mockKafkaTemplate.send(*_)
+ }
+}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2022 Nordix Foundation
+ * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
* Modifications Copyright (C) 2021-2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.ObjectWriter
+import org.onap.cps.ncmp.dmi.cmstack.avc.CmAvcEventService
import org.onap.cps.ncmp.dmi.config.DmiPluginConfig
import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException
import org.onap.cps.ncmp.dmi.exception.DmiException
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException
import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException
import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException
-import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException
-import org.onap.cps.ncmp.dmi.service.model.ModuleReference
import org.onap.cps.ncmp.dmi.model.YangResource
import org.onap.cps.ncmp.dmi.model.YangResources
import org.onap.cps.ncmp.dmi.service.client.NcmpRestClient
+import org.onap.cps.ncmp.dmi.service.model.ModuleReference
import org.onap.cps.ncmp.dmi.service.model.ModuleSchema
import org.onap.cps.ncmp.dmi.service.operation.SdncOperations
import org.springframework.http.HttpStatus
def spyObjectMapper = Spy(ObjectMapper)
def mockObjectMapper = Mock(ObjectMapper)
def mockSdncOperations = Mock(SdncOperations)
- def objectUnderTest = new DmiServiceImpl(mockDmiPluginProperties, mockNcmpRestClient, mockSdncOperations, spyObjectMapper)
+ def mockCmAvcEventService = Mock(CmAvcEventService)
+ def objectUnderTest = new DmiServiceImpl(mockDmiPluginProperties, mockNcmpRestClient, mockSdncOperations, spyObjectMapper, mockCmAvcEventService)
def 'Register cm handles with ncmp.'() {
given: 'some cm-handle ids'
def 'Write resource data with special characters.'() {
given: 'sdnc returns a created response'
- mockSdncOperations.writeData(CREATE, 'some-cmHandle',
- 'some-resourceIdentifier', 'some-dataType', 'data with quote " and \n new line') >> new ResponseEntity<String>('response json', HttpStatus.CREATED)
+ mockSdncOperations.writeData(CREATE, 'my-cmHandle',
+ 'my-resourceIdentifier', 'my-dataType', 'data with quote " and \n new line') >> new ResponseEntity<String>('response json', HttpStatus.CREATED)
when: 'resource data is written to sdnc'
- def response = objectUnderTest.writeData(CREATE, 'some-cmHandle',
- 'some-resourceIdentifier', 'some-dataType', 'data with quote " and \n new line')
+ def response = objectUnderTest.writeData(CREATE, 'my-cmHandle',
+ 'my-resourceIdentifier', 'my-dataType', 'data with quote " and \n new line')
then: 'the response matches the expected data'
- response == 'response json'
+ assert response == 'response json'
+ and: 'the cm avc event is sent to NCMP'
+ 1 * mockCmAvcEventService.sendCmAvcEvent(CREATE, 'my-cmHandle', 'my-resourceIdentifier', 'data with quote " and \n new line')
}
def 'Write resource data for passthrough running with a 500 response from sdnc.'() {
'some-resourceIdentifier', 'some-dataType', _ as String)
then: 'a dmi exception is thrown'
thrown(DmiException.class)
+ and : 'cm avc event is not sent'
+ 0 * mockCmAvcEventService.sendCmAvcEvent(*_)
+ }
+
+ def 'Enabling data synchronization flag'() {
+ given: 'a list of cm handle ids'
+ def cmHandleIds = ['ch-1', 'ch-2']
+ when: 'data sync is enabled for the cm handles'
+ objectUnderTest.enableNcmpDataSyncForCmHandles(cmHandleIds)
+ then: 'the data sync is enabled for each cm handle (over the REST interface)'
+ 1 * mockNcmpRestClient.enableNcmpDataSync('ch-1')
+ 1 * mockNcmpRestClient.enableNcmpDataSync('ch-2')
+
}
}
\ No newline at end of file