"type": "string"
}
},
+ "resourceIdentifier": {
+ "description": "The format of resource identifier depend on the associated DMI Plugin implementation. For ONAP DMI Plugin it will be RESTConf paths but it can really be anything.",
+ "type": "string"
+ },
+ "options": {
+ "description": "It is mandatory to add as key(s)=value(s)'. The format of options parameter depend on the associated DMI Plugin implementation.",
+ "type": "string"
+ },
"statusCode": {
"description": "which says success or failure (0-99) are for success and (100-199) are for failure",
"type": "string"
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
- "$id": "urn:cps:org.onap.cps.ncmp.events:subscription-event-schema:1.0.0",
- "$ref": "#/definitions/SubscriptionEvent",
+ "$id": "urn:cps:org.onap.cps.ncmp.events:cm-subscription-dmi-in-event-schema:1.0.0",
+ "$ref": "#/definitions/CmSubscriptionDmiInEvent",
"definitions": {
- "SubscriptionEvent": {
+ "CmSubscriptionDmiInEvent": {
"description": "The payload for subscription event to be forwarded to dmi plugins.",
- "javaType": "org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent",
+ "javaType": "org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent",
"properties": {
"data": {
"properties": {
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
- "$id": "urn:cps:org.onap.cps.ncmp.events:dmi-subscription-response-event-schema:1.0.0",
- "$ref": "#/definitions/SubscriptionEventResponse",
+ "$id": "urn:cps:org.onap.cps.ncmp.events:cm-subscription-dmi-out-event-schema:1.0.0",
+ "$ref": "#/definitions/CmSubscriptionDmiOutEvent",
"definitions": {
"SubscriptionStatus": {
"description": "The subscription status information",
],
"additionalProperties": false
},
- "SubscriptionEventResponse" : {
+ "CmSubscriptionDmiOutEvent" : {
"description": "The payload for subscription response event.",
"type": "object",
- "javaType": "org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse",
+ "javaType": "org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent",
"properties": {
"data": {
"type": "object",
{
- "$id": "urn:cps:org.onap.cps.ncmp.events:avc-subscription-event:1.0.0",
- "$ref": "#/definitions/SubscriptionEvent",
+ "$id": "urn:cps:org.onap.cps.ncmp.events:cm-subscription-ncmp-in-event:1.0.0",
+ "$ref": "#/definitions/CmSubscriptionNcmpInEvent",
"$schema": "https://json-schema.org/draft/2019-09/schema",
"definitions": {
- "SubscriptionEvent": {
+ "CmSubscriptionNcmpInEvent": {
"description": "The payload for subscription event.",
- "javaType": "org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent",
+ "javaType": "org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent",
"properties": {
"data": {
"properties": {
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
- "$id": "urn:cps:org.onap.cps.ncmp.events:subscription-event-outcome-schema:1.0.0",
- "$ref": "#/definitions/SubscriptionEventOutcome",
+ "$id": "urn:cps:org.onap.cps.ncmp.events:cm-subscription-ncmp-out-event-schema:1.0.0",
+ "$ref": "#/definitions/CmSubscriptionNcmpOutEvent",
"definitions": {
- "SubscriptionEventOutcome": {
+ "CmSubscriptionNcmpOutEvent": {
"description": "The payload for avc subscription event outcome message.",
"type": "object",
- "javaType": "org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome",
+ "javaType": "org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent",
"additionalProperties": false,
"properties": {
"data": {
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
+
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
import com.hazelcast.map.IMap;
import io.cloudevents.CloudEvent;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
-import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventResponseCloudMapper;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent;
import org.onap.cps.spi.model.DataNode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
@Component
@Slf4j
@RequiredArgsConstructor
-public class SubscriptionEventResponseConsumer {
+public class CmSubscriptionDmiOutEventConsumer {
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
private final SubscriptionPersistence subscriptionPersistence;
- private final SubscriptionEventResponseMapper subscriptionEventResponseMapper;
- private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
+ private final CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper
+ cmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper;
+ private final CmSubscriptionNcmpOutEventPublisher cmSubscriptionNcmpOutEventPublisher;
@Value("${notification.enabled:true}")
private boolean notificationFeatureEnabled;
/**
* Consume subscription response event.
*
- * @param subscriptionEventResponseConsumerRecord the event to be consumed
+ * @param cmSubscriptionDmiOutConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeSubscriptionEventResponse(
- final ConsumerRecord<String, CloudEvent> subscriptionEventResponseConsumerRecord) {
- final CloudEvent cloudEvent = subscriptionEventResponseConsumerRecord.value();
- final String eventType = subscriptionEventResponseConsumerRecord.value().getType();
- final SubscriptionEventResponse subscriptionEventResponse =
- SubscriptionEventResponseCloudMapper.toSubscriptionEventResponse(cloudEvent);
- final String clientId = subscriptionEventResponse.getData().getClientId();
+ final ConsumerRecord<String, CloudEvent> cmSubscriptionDmiOutConsumerRecord) {
+ final CloudEvent cloudEvent = cmSubscriptionDmiOutConsumerRecord.value();
+ final String eventType = cmSubscriptionDmiOutConsumerRecord.value().getType();
+ final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent =
+ toTargetEvent(cloudEvent, CmSubscriptionDmiOutEvent.class);
+ final String clientId = cmSubscriptionDmiOutEvent.getData().getClientId();
log.info("subscription event response of clientId: {} is received.", clientId);
- final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
+ final String subscriptionName = cmSubscriptionDmiOutEvent.getData().getSubscriptionName();
final String subscriptionEventId = clientId + subscriptionName;
boolean createOutcomeResponse = false;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
- dmiNames.remove(subscriptionEventResponse.getData().getDmiName());
+ dmiNames.remove(cmSubscriptionDmiOutEvent.getData().getDmiName());
forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
}
if (subscriptionModelLoaderEnabled) {
- updateSubscriptionEvent(subscriptionEventResponse);
+ updateSubscriptionEvent(cmSubscriptionDmiOutEvent);
}
if (createOutcomeResponse
&& notificationFeatureEnabled
&& hasNoPendingCmHandles(clientId, subscriptionName)) {
- subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse, eventType);
+ cmSubscriptionNcmpOutEventPublisher.sendResponse(cmSubscriptionDmiOutEvent, eventType);
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
}
return true;
}
- private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) {
+ private void updateSubscriptionEvent(final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent) {
final YangModelSubscriptionEvent yangModelSubscriptionEvent =
- subscriptionEventResponseMapper
- .toYangModelSubscriptionEvent(subscriptionEventResponse);
+ cmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper
+ .toYangModelSubscriptionEvent(cmSubscriptionDmiOutEvent);
subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent);
}
}
\ No newline at end of file
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
import java.util.List;
import java.util.Map;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.AdditionalInfo;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.AdditionalInfoDetail;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.AdditionalInfo;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.AdditionalInfoDetail;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent;
import org.onap.cps.spi.exceptions.DataValidationException;
@Mapper(componentModel = "spring")
-public interface SubscriptionOutcomeMapper {
+public interface CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper {
@Mapping(source = "data.subscriptionStatus", target = "data.additionalInfo",
qualifiedByName = "mapListOfSubscriptionStatusToAdditionalInfo")
- SubscriptionEventOutcome toSubscriptionEventOutcome(SubscriptionEventResponse subscriptionEventResponse);
+ CmSubscriptionNcmpOutEvent toCmSubscriptionNcmpOutEvent(CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent);
/**
* Maps list of SubscriptionStatus to an AdditionalInfo.
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
import java.util.List;
import java.util.stream.Collectors;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
@Mapper(componentModel = "spring")
-public interface SubscriptionEventResponseMapper {
+public interface CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper {
@Mapping(source = "data.clientId", target = "clientId")
@Mapping(source = "data.subscriptionName", target = "subscriptionName")
@Mapping(source = "data.subscriptionStatus", target = "predicates.targetCmHandles",
qualifiedByName = "mapSubscriptionStatusToCmHandleTargets")
YangModelSubscriptionEvent toYangModelSubscriptionEvent(
- SubscriptionEventResponse subscriptionEventResponse);
+ CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent);
/**
* Maps SubscriptionStatus to list of TargetCmHandle.
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
+
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
+import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL;
+import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING;
import io.cloudevents.CloudEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
-import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventCloudMapper;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
-public class SubscriptionEventConsumer {
+public class CmSubscriptionNcmpInEventConsumer {
- private final SubscriptionEventForwarder subscriptionEventForwarder;
- private final SubscriptionEventMapper subscriptionEventMapper;
+ private final CmSubscriptionNcmpInEventForwarder cmSubscriptionNcmpInEventForwarder;
+ private final CmSubscriptionNcmpInEventMapper cmSubscriptionNcmpInEventMapper;
private final SubscriptionPersistence subscriptionPersistence;
@Value("${notification.enabled:true}")
public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) {
final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value();
final String eventType = subscriptionEventConsumerRecord.value().getType();
- final SubscriptionEvent subscriptionEvent = SubscriptionEventCloudMapper.toSubscriptionEvent(cloudEvent);
- final String eventDatastore = subscriptionEvent.getData().getPredicates().getDatastore();
- if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) {
+ final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent =
+ toTargetEvent(cloudEvent, CmSubscriptionNcmpInEvent.class);
+ final String eventDatastore = cmSubscriptionNcmpInEvent.getData().getPredicates().getDatastore();
+ if (!eventDatastore.equals(PASSTHROUGH_RUNNING.getDatastoreName()) || eventDatastore.equals(
+ PASSTHROUGH_OPERATIONAL.getDatastoreName())) {
throw new UnsupportedOperationException(
- "passthrough datastores are currently only supported for event subscriptions");
+ "passthrough datastores are currently only supported for event subscriptions");
}
- if ("CM".equals(subscriptionEvent.getData().getDataType().getDataCategory())) {
+ if ("CM".equals(cmSubscriptionNcmpInEvent.getData().getDataType().getDataCategory())) {
if (subscriptionModelLoaderEnabled) {
- persistSubscriptionEvent(subscriptionEvent);
+ persistSubscriptionEvent(cmSubscriptionNcmpInEvent);
}
if ("subscriptionCreated".equals(cloudEvent.getType())) {
log.info("Subscription for ClientID {} with name {} ...",
- subscriptionEvent.getData().getSubscription().getClientID(),
- subscriptionEvent.getData().getSubscription().getName());
+ cmSubscriptionNcmpInEvent.getData().getSubscription().getClientID(),
+ cmSubscriptionNcmpInEvent.getData().getSubscription().getName());
if (notificationFeatureEnabled) {
- subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent, eventType);
+ cmSubscriptionNcmpInEventForwarder.forwardCreateSubscriptionEvent(cmSubscriptionNcmpInEvent,
+ eventType);
}
}
} else {
}
}
- private void persistSubscriptionEvent(final SubscriptionEvent subscriptionEvent) {
+ private void persistSubscriptionEvent(final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent) {
final YangModelSubscriptionEvent yangModelSubscriptionEvent =
- subscriptionEventMapper.toYangModelSubscriptionEvent(subscriptionEvent);
+ cmSubscriptionNcmpInEventMapper.toYangModelSubscriptionEvent(cmSubscriptionNcmpInEvent);
subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent);
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
import com.hazelcast.map.IMap;
import io.cloudevents.CloudEvent;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
+import org.onap.cps.ncmp.api.impl.utils.CmSubscriptionEventCloudMapper;
import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer;
-import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventCloudMapper;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.Data;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmHandle;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
-public class SubscriptionEventForwarder {
+public class CmSubscriptionNcmpInEventForwarder {
private final InventoryPersistence inventoryPersistence;
private final EventsPublisher<CloudEvent> eventsPublisher;
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
- private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
- private final SubscriptionEventMapper subscriptionEventMapper;
- private final ClientSubscriptionEventMapper clientSubscriptionEventMapper;
+ private final CmSubscriptionNcmpOutEventPublisher cmSubscriptionNcmpOutEventPublisher;
+ private final CmSubscriptionNcmpInEventMapper cmSubscriptionNcmpInEventMapper;
+ private final CmSubscriptionEventCloudMapper cmSubscriptionEventCloudMapper;
+ private final CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper
+ cmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper;
private final SubscriptionPersistence subscriptionPersistence;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
@Value("${app.ncmp.avc.subscription-forward-topic-prefix}")
/**
* Forward subscription event.
*
- * @param subscriptionEvent the event to be forwarded
+ * @param cmSubscriptionNcmpInEvent the event to be forwarded
*/
- public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent, final String eventType) {
- final List<String> cmHandleTargets = subscriptionEvent.getData().getPredicates().getTargets();
- if (cmHandleTargets == null || cmHandleTargets.isEmpty()
- || cmHandleTargets.stream().anyMatch(id -> (id).contains("*"))) {
+ public void forwardCreateSubscriptionEvent(final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent,
+ final String eventType) {
+ final List<String> cmHandleTargets = cmSubscriptionNcmpInEvent.getData().getPredicates().getTargets();
+ if (cmHandleTargets == null || cmHandleTargets.isEmpty() || cmHandleTargets.stream()
+ .anyMatch(id -> (id).contains("*"))) {
throw new UnsupportedOperationException(
"CMHandle targets are required. \"Wildcard\" operations are not yet supported");
}
final Collection<YangModelCmHandle> yangModelCmHandles =
inventoryPersistence.getYangModelCmHandles(cmHandleTargets);
- final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName
- = DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
- findDmisAndRespond(subscriptionEvent, eventType, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName);
+ final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName =
+ DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
+ findDmisAndRespond(cmSubscriptionNcmpInEvent, eventType, cmHandleTargets,
+ dmiPropertiesPerCmHandleIdPerServiceName);
}
- private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final String eventType,
- final List<String> cmHandleTargetsAsStrings,
- final Map<String, Map<String, Map<String, String>>>
- dmiPropertiesPerCmHandleIdPerServiceName) {
- final SubscriptionEventResponse emptySubscriptionEventResponse =
- new SubscriptionEventResponse().withData(new Data());
- emptySubscriptionEventResponse.getData().setSubscriptionName(
- subscriptionEvent.getData().getSubscription().getName());
- emptySubscriptionEventResponse.getData().setClientId(
- subscriptionEvent.getData().getSubscription().getClientID());
- final List<String> cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream()
- .map(Map.Entry::getValue).map(Map::keySet).flatMap(Set::stream).collect(Collectors.toList());
+ private void findDmisAndRespond(final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent, final String eventType,
+ final List<String> cmHandleTargetsAsStrings,
+ final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName) {
+ final CmSubscriptionDmiOutEvent emptyCmSubscriptionDmiOutEvent =
+ new CmSubscriptionDmiOutEvent().withData(new Data());
+ emptyCmSubscriptionDmiOutEvent.getData()
+ .setSubscriptionName(cmSubscriptionNcmpInEvent.getData().getSubscription().getName());
+ emptyCmSubscriptionDmiOutEvent.getData()
+ .setClientId(cmSubscriptionNcmpInEvent.getData().getSubscription().getClientID());
+ final List<String> cmHandlesThatExistsInDb =
+ dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream().map(Map.Entry::getValue).map(Map::keySet)
+ .flatMap(Set::stream).collect(Collectors.toList());
final List<String> targetCmHandlesDoesNotExistInDb = new ArrayList<>(cmHandleTargetsAsStrings);
targetCmHandlesDoesNotExistInDb.removeAll(cmHandlesThatExistsInDb);
final Set<String> dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet());
if (dmisToRespond.isEmpty() || !targetCmHandlesDoesNotExistInDb.isEmpty()) {
- updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb);
+ updatesCmHandlesToRejectedAndPersistSubscriptionEvent(cmSubscriptionNcmpInEvent,
+ targetCmHandlesDoesNotExistInDb);
}
if (dmisToRespond.isEmpty()) {
- subscriptionEventResponseOutcome.sendResponse(emptySubscriptionEventResponse,
+ cmSubscriptionNcmpOutEventPublisher.sendResponse(emptyCmSubscriptionDmiOutEvent,
"subscriptionCreatedStatus");
} else {
- startResponseTimeout(emptySubscriptionEventResponse, dmisToRespond);
- final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent =
- clientSubscriptionEventMapper.toNcmpSubscriptionEvent(subscriptionEvent);
- forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent, eventType);
+ startResponseTimeout(emptyCmSubscriptionDmiOutEvent, dmisToRespond);
+ final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent =
+ cmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper.toCmSubscriptionDmiInEvent(
+ cmSubscriptionNcmpInEvent);
+ forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, cmSubscriptionDmiInEvent, eventType);
}
}
- private void startResponseTimeout(final SubscriptionEventResponse emptySubscriptionEventResponse,
+ private void startResponseTimeout(final CmSubscriptionDmiOutEvent emptyCmSubscriptionDmiOutEvent,
final Set<String> dmisToRespond) {
- final String subscriptionClientId = emptySubscriptionEventResponse.getData().getClientId();
- final String subscriptionName = emptySubscriptionEventResponse.getData().getSubscriptionName();
+ final String subscriptionClientId = emptyCmSubscriptionDmiOutEvent.getData().getClientId();
+ final String subscriptionName = emptyCmSubscriptionDmiOutEvent.getData().getSubscriptionName();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond,
ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
final ResponseTimeoutTask responseTimeoutTask =
- new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome,
- emptySubscriptionEventResponse);
- try {
- executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS);
- } catch (final RuntimeException ex) {
- log.info("Caught exception in ScheduledExecutorService for ResponseTimeoutTask. StackTrace: {}",
- ex.toString());
- }
+ new ResponseTimeoutTask(forwardedSubscriptionEventCache, cmSubscriptionNcmpOutEventPublisher,
+ emptyCmSubscriptionDmiOutEvent);
+
+ executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS);
}
private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap,
- final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent
- ncmpSubscriptionEvent, final String eventType) {
+ final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent, final String eventType) {
dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> {
final List<CmHandle> cmHandleTargets = cmHandlePropertiesMap.entrySet().stream().map(
cmHandleAndProperties -> {
return cmHandle;
}).collect(Collectors.toList());
- ncmpSubscriptionEvent.getData().getPredicates().setTargets(cmHandleTargets);
- final String eventKey = createEventKey(ncmpSubscriptionEvent, dmiName);
+ cmSubscriptionDmiInEvent.getData().getPredicates().setTargets(cmHandleTargets);
+ final String eventKey = createEventKey(cmSubscriptionDmiInEvent, dmiName);
final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName;
- final CloudEvent ncmpSubscriptionCloudEvent =
- SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey, eventType);
- eventsPublisher.publishCloudEvent(dmiAvcSubscriptionTopic, eventKey, ncmpSubscriptionCloudEvent);
+ final CloudEvent cmSubscriptionDmiInCloudEvent =
+ cmSubscriptionEventCloudMapper.toCloudEvent(cmSubscriptionDmiInEvent, eventKey, eventType);
+ eventsPublisher.publishCloudEvent(dmiAvcSubscriptionTopic, eventKey, cmSubscriptionDmiInCloudEvent);
});
}
- private String createEventKey(
- final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent subscriptionEvent,
- final String dmiName) {
- return subscriptionEvent.getData().getSubscription().getClientID()
- + "-"
- + subscriptionEvent.getData().getSubscription().getName()
- + "-"
- + dmiName;
+ private String createEventKey(final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent, final String dmiName) {
+ return cmSubscriptionDmiInEvent.getData().getSubscription().getClientID() + "-"
+ + cmSubscriptionDmiInEvent.getData().getSubscription().getName() + "-" + dmiName;
}
private void updatesCmHandlesToRejectedAndPersistSubscriptionEvent(
- final SubscriptionEvent subscriptionEvent,
+ final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent,
final List<String> targetCmHandlesDoesNotExistInDb) {
final YangModelSubscriptionEvent yangModelSubscriptionEvent =
- subscriptionEventMapper.toYangModelSubscriptionEvent(subscriptionEvent);
+ cmSubscriptionNcmpInEventMapper.toYangModelSubscriptionEvent(cmSubscriptionNcmpInEvent);
yangModelSubscriptionEvent.getPredicates()
- .setTargetCmHandles(findRejectedCmHandles(targetCmHandlesDoesNotExistInDb,
- yangModelSubscriptionEvent));
+ .setTargetCmHandles(findRejectedCmHandles(targetCmHandlesDoesNotExistInDb, yangModelSubscriptionEvent));
subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent);
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
import java.util.List;
import java.util.stream.Collectors;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent;
@Mapper(componentModel = "spring")
-public interface SubscriptionEventMapper {
+public interface CmSubscriptionNcmpInEventMapper {
@Mapping(source = "data.subscription.clientID", target = "clientId")
@Mapping(source = "data.subscription.name", target = "subscriptionName")
@Mapping(source = "data.predicates.targets", target = "predicates.targetCmHandles",
qualifiedByName = "mapTargetsToCmHandleTargets")
@Mapping(source = "data.predicates.datastore", target = "predicates.datastore")
- YangModelSubscriptionEvent toYangModelSubscriptionEvent(SubscriptionEvent subscriptionEvent);
+ YangModelSubscriptionEvent toYangModelSubscriptionEvent(CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent);
/**
* Maps list of Targets to list of TargetCmHandle.
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent;
@Mapper(componentModel = "spring")
-public interface ClientSubscriptionEventMapper {
+public interface CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper {
@Mapping(target = "data.predicates.targets", ignore = true)
- org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent toNcmpSubscriptionEvent(
- SubscriptionEvent subscriptionEvent);
+ CmSubscriptionDmiInEvent toCmSubscriptionDmiInEvent(
+ CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent);
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
import io.cloudevents.CloudEvent;
import java.util.List;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
import org.onap.cps.ncmp.api.impl.utils.SubscriptionOutcomeCloudMapper;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
-public class SubscriptionEventResponseOutcome {
+public class CmSubscriptionNcmpOutEventPublisher {
private final SubscriptionPersistence subscriptionPersistence;
private final EventsPublisher<CloudEvent> outcomeEventsPublisher;
- private final SubscriptionOutcomeMapper subscriptionOutcomeMapper;
+ private final CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper
+ cmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper;
+
+ private final SubscriptionOutcomeCloudMapper subscriptionOutcomeCloudMapper;
@Value("${app.ncmp.avc.subscription-outcome-topic:subscription-response}")
private String subscriptionOutcomeEventTopic;
/**
* This is for construction of outcome message to be published for client apps.
*
- * @param subscriptionEventResponse event produced by Dmi Plugin
+ * @param cmSubscriptionDmiOutEvent event produced by Dmi Plugin
*/
- public void sendResponse(final SubscriptionEventResponse subscriptionEventResponse, final String eventKey) {
- final SubscriptionEventOutcome subscriptionEventOutcome =
- formSubscriptionOutcomeMessage(subscriptionEventResponse);
- final String subscriptionClientId = subscriptionEventResponse.getData().getClientId();
- final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
+ public void sendResponse(final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent, final String eventKey) {
+ final CmSubscriptionNcmpOutEvent cmSubscriptionNcmpOutEvent =
+ formCmSubscriptionNcmpOutEvent(cmSubscriptionDmiOutEvent);
+ final String subscriptionClientId = cmSubscriptionDmiOutEvent.getData().getClientId();
+ final String subscriptionName = cmSubscriptionDmiOutEvent.getData().getSubscriptionName();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
final CloudEvent subscriptionOutcomeCloudEvent =
- SubscriptionOutcomeCloudMapper.toCloudEvent(subscriptionEventOutcome,
+ subscriptionOutcomeCloudMapper.toCloudEvent(cmSubscriptionNcmpOutEvent,
subscriptionEventId, eventKey);
outcomeEventsPublisher.publishCloudEvent(subscriptionOutcomeEventTopic,
subscriptionEventId, subscriptionOutcomeCloudEvent);
}
- private SubscriptionEventOutcome formSubscriptionOutcomeMessage(
- final SubscriptionEventResponse subscriptionEventResponse) {
+ private CmSubscriptionNcmpOutEvent formCmSubscriptionNcmpOutEvent(
+ final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent) {
final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap =
DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(
- subscriptionPersistence.getCmHandlesForSubscriptionEvent(
- subscriptionEventResponse.getData().getClientId(),
- subscriptionEventResponse.getData().getSubscriptionName()));
- final List<org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus>
- subscriptionStatusList = mapCmHandleIdStatusDetailsMapToSubscriptionStatusList(
- cmHandleIdToStatusAndDetailsAsMap);
- subscriptionEventResponse.getData().setSubscriptionStatus(subscriptionStatusList);
- return fromSubscriptionEventResponse(subscriptionEventResponse,
+ subscriptionPersistence.getCmHandlesForSubscriptionEvent(
+ cmSubscriptionDmiOutEvent.getData().getClientId(),
+ cmSubscriptionDmiOutEvent.getData().getSubscriptionName()));
+ final List<org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus>
+ subscriptionStatusList =
+ mapCmHandleIdStatusDetailsMapToSubscriptionStatusList(cmHandleIdToStatusAndDetailsAsMap);
+ cmSubscriptionDmiOutEvent.getData().setSubscriptionStatus(subscriptionStatusList);
+ return fromDmiOutEvent(cmSubscriptionDmiOutEvent,
decideOnNcmpEventResponseCodeForSubscription(cmHandleIdToStatusAndDetailsAsMap));
}
- private static List<org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus>
+ private static List<org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus>
mapCmHandleIdStatusDetailsMapToSubscriptionStatusList(
final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap) {
return cmHandleIdToStatusAndDetailsAsMap.entrySet()
.stream().map(entryset -> {
- final org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus
- subscriptionStatus = new org.onap.cps.ncmp.events.avcsubscription1_0_0
+ final org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus
+ subscriptionStatus = new org.onap.cps.ncmp.events.cmsubscription1_0_0
.dmi_to_ncmp.SubscriptionStatus();
final String cmHandleId = entryset.getKey();
final Map<String, String> statusAndDetailsMap = entryset.getValue();
final String details = statusAndDetailsMap.get("details");
subscriptionStatus.setId(cmHandleId);
subscriptionStatus.setStatus(
- org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp
+ org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp
.SubscriptionStatus.Status.fromValue(status));
subscriptionStatus.setDetails(details);
return subscriptionStatus;
.allMatch(entryset -> entryset.containsValue(subscriptionStatus.toString()));
}
- private SubscriptionEventOutcome fromSubscriptionEventResponse(
- final SubscriptionEventResponse subscriptionEventResponse,
+ private CmSubscriptionNcmpOutEvent fromDmiOutEvent(
+ final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent,
final NcmpEventResponseCode ncmpEventResponseCode) {
- final SubscriptionEventOutcome subscriptionEventOutcome =
- subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse);
- subscriptionEventOutcome.getData().setStatusCode(Integer.parseInt(ncmpEventResponseCode.getStatusCode()));
- subscriptionEventOutcome.getData().setStatusMessage(ncmpEventResponseCode.getStatusMessage());
+ final CmSubscriptionNcmpOutEvent cmSubscriptionNcmpOutEvent =
+ cmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper.toCmSubscriptionNcmpOutEvent(
+ cmSubscriptionDmiOutEvent);
+ cmSubscriptionNcmpOutEvent.getData().setStatusCode(Integer.parseInt(ncmpEventResponseCode.getStatusCode()));
+ cmSubscriptionNcmpOutEvent.getData().setStatusMessage(ncmpEventResponseCode.getStatusMessage());
- return subscriptionEventOutcome;
+ return cmSubscriptionNcmpOutEvent;
}
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+package org.onap.cps.ncmp.api.impl.events.cmsubscription;
import com.hazelcast.map.IMap;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent;
@Slf4j
@RequiredArgsConstructor
public class ResponseTimeoutTask implements Runnable {
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
- private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
- private final SubscriptionEventResponse subscriptionEventResponse;
+ private final CmSubscriptionNcmpOutEventPublisher cmSubscriptionNcmpOutEventPublisher;
+ private final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent;
@Override
public void run() {
-
- try {
- generateAndSendResponse();
- } catch (final Exception exception) {
- log.info("Caught exception in Runnable for ResponseTimeoutTask. StackTrace: {}",
- exception.toString());
- }
-
+ generateTimeoutResponse();
}
- private void generateAndSendResponse() {
- final String subscriptionClientId = subscriptionEventResponse.getData().getClientId();
- final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName();
+ private void generateTimeoutResponse() {
+ final String subscriptionClientId = cmSubscriptionDmiOutEvent.getData().getClientId();
+ final String subscriptionName = cmSubscriptionDmiOutEvent.getData().getSubscriptionName();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
- subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse,
+ cmSubscriptionNcmpOutEventPublisher.sendResponse(cmSubscriptionDmiOutEvent,
"subscriptionCreatedStatus");
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.mapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.CloudEventUtils;
+import io.cloudevents.core.data.PojoCloudEventData;
+import io.cloudevents.jackson.PojoCloudEventDataMapper;
+import io.cloudevents.rw.CloudEventRWException;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CloudEventMapper {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Generic method to map cloud event data to target event class object.
+ *
+ * @param cloudEvent input cloud event
+ * @param targetEventClass target event class
+ * @param <T> target event class type
+ * @return mapped target event
+ */
+ public static <T> T toTargetEvent(final CloudEvent cloudEvent, final Class<T> targetEventClass) {
+ PojoCloudEventData<T> mappedCloudEvent = null;
+
+ try {
+ mappedCloudEvent =
+ CloudEventUtils.mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, targetEventClass));
+
+ } catch (final CloudEventRWException cloudEventRwException) {
+ log.error("Unable to map cloud event to target event class type : {} with cause : {}", targetEventClass,
+ cloudEventRwException.getMessage());
+ }
+
+ return mappedCloudEvent == null ? null : mappedCloudEvent.getValue();
+ }
+
+}
package org.onap.cps.ncmp.api.impl.operations;
import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import java.util.ArrayList;
import java.util.List;
@JsonPropertyOrder({"operation", "operationId", "datastore", "options", "resourceIdentifier", "cmHandles"})
public class DmiDataOperation {
- @JsonProperty("operation")
- private OperationType operationType;
+ private OperationType operation;
private String operationId;
private String datastore;
private String options;
final DataOperationDefinition dataOperationDefinition) {
return DmiDataOperation.builder()
- .operationType(OperationType.fromOperationName(dataOperationDefinition.getOperation()))
+ .operation(OperationType.fromOperationName(dataOperationDefinition.getOperation()))
.operationId(dataOperationDefinition.getOperationId())
.datastore(DatastoreType.fromDatastoreName(dataOperationDefinition.getDatastore()).getDatastoreName())
.options(dataOperationDefinition.getOptions())
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2020 Pantheon.tech
- * Modifications Copyright (C) 2020 Bell Canada
- * Modifications Copyright (C) 2020-2023 Nordix Foundation
+ * Copyright (C) 2023 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.utils;
+package org.onap.cps.ncmp.api.impl.operations;
-import org.onap.cps.spi.exceptions.CpsException;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.util.List;
+import lombok.Builder;
+import lombok.Getter;
-public class CloudEventConstructionException extends CpsException {
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Getter
+@Builder
+public class DmiDataOperationRequest {
- private static final long serialVersionUID = 7747941311132087621L;
+ private List<DmiDataOperation> operations;
- /**
- * Constructor.
- *
- * @param message the error message
- * @param details the error details
- * @param cause the error cause
- */
- public CloudEventConstructionException(final String message, final String details, final Throwable cause) {
- super(message, details, cause);
- }
}
private void sendDataOperationRequestToDmiService(final String dataOperationResourceUrl,
final List<DmiDataOperation> dmiDataOperationRequestBodies) {
- final String dataOperationRequestBodiesAsJsonString =
- jsonObjectMapper.asJsonString(dmiDataOperationRequestBodies);
+ final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder()
+ .operations(dmiDataOperationRequestBodies).build();
+ final String dmiDataOperationRequestAsJsonString =
+ jsonObjectMapper.asJsonString(dmiDataOperationRequest);
TaskExecutor.executeTask(() -> dmiRestClient.postOperationWithJsonData(dataOperationResourceUrl,
- dataOperationRequestBodiesAsJsonString, READ),
+ dmiDataOperationRequestAsJsonString, READ),
DEFAULT_ASYNC_TASK_EXECUTOR_TIMEOUT_IN_MILLISECONDS)
.whenCompleteAsync((response, throwable) -> handleTaskCompletionException(throwable,
dataOperationResourceUrl, dmiDataOperationRequestBodies));
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.util.UUID;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class CmSubscriptionEventCloudMapper {
+
+ private final ObjectMapper objectMapper;
+
+ private static String randomId = UUID.randomUUID().toString();
+
+ /**
+ * Maps CmSubscriptionDmiInEvent to a CloudEvent.
+ *
+ * @param cmSubscriptionDmiInEvent object.
+ * @param eventKey as String.
+ * @return CloudEvent built.
+ */
+ public CloudEvent toCloudEvent(final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent, final String eventKey,
+ final String eventType) {
+ try {
+ return CloudEventBuilder.v1().withId(randomId)
+ .withSource(URI.create(cmSubscriptionDmiInEvent.getData().getSubscription().getClientID()))
+ .withType(eventType).withExtension("correlationid", eventKey)
+ .withDataSchema(URI.create("urn:cps:" + CmSubscriptionDmiInEvent.class.getName() + ":1.0.0"))
+ .withData(objectMapper.writeValueAsBytes(cmSubscriptionDmiInEvent)).build();
+ } catch (final JsonProcessingException jsonProcessingException) {
+ log.error("The Cloud Event could not be constructed", jsonProcessingException);
+ }
+ return null;
+ }
+}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.utils;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.cloudevents.CloudEvent;
-import io.cloudevents.core.CloudEventUtils;
-import io.cloudevents.core.builder.CloudEventBuilder;
-import io.cloudevents.core.data.PojoCloudEventData;
-import io.cloudevents.jackson.PojoCloudEventDataMapper;
-import java.net.URI;
-import java.util.UUID;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
-
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Slf4j
-public class SubscriptionEventCloudMapper {
-
- private static final ObjectMapper objectMapper = new ObjectMapper();
-
- private static String randomId = UUID.randomUUID().toString();
-
- /**
- * Maps CloudEvent object to SubscriptionEvent.
- *
- * @param cloudEvent object.
- * @return SubscriptionEvent deserialized.
- */
- public static SubscriptionEvent toSubscriptionEvent(final CloudEvent cloudEvent) {
- final PojoCloudEventData<SubscriptionEvent> deserializedCloudEvent = CloudEventUtils
- .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, SubscriptionEvent.class));
- if (deserializedCloudEvent == null) {
- log.debug("No data found in the consumed event");
- return null;
- } else {
- final SubscriptionEvent subscriptionEvent = deserializedCloudEvent.getValue();
- log.debug("Consuming event {}", subscriptionEvent);
- return subscriptionEvent;
- }
- }
-
- /**
- * Maps SubscriptionEvent to a CloudEvent.
- *
- * @param ncmpSubscriptionEvent object.
- * @param eventKey as String.
- * @return CloudEvent built.
- */
- public static CloudEvent toCloudEvent(
- final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent,
- final String eventKey, final String eventType) {
- try {
- return CloudEventBuilder.v1()
- .withId(randomId)
- .withSource(URI.create(ncmpSubscriptionEvent.getData().getSubscription().getClientID()))
- .withType(eventType)
- .withExtension("correlationid", eventKey)
- .withDataSchema(URI.create("urn:cps:"
- + org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi
- .SubscriptionEvent.class.getName() + ":1.0.0"))
- .withData(objectMapper.writeValueAsBytes(ncmpSubscriptionEvent)).build();
- } catch (final Exception ex) {
- throw new CloudEventConstructionException("The Cloud Event could not be constructed", "Invalid object to "
- + "serialize or required headers is missing", ex);
- }
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.utils;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.cloudevents.CloudEvent;
-import io.cloudevents.core.CloudEventUtils;
-import io.cloudevents.core.data.PojoCloudEventData;
-import io.cloudevents.jackson.PojoCloudEventDataMapper;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
-
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Slf4j
-public class SubscriptionEventResponseCloudMapper {
-
- private static final ObjectMapper objectMapper = new ObjectMapper();
-
- /**
- * Maps CloudEvent object to SubscriptionEventResponse.
- *
- * @param cloudEvent object
- * @return SubscriptionEventResponse deserialized
- */
- public static SubscriptionEventResponse toSubscriptionEventResponse(final CloudEvent cloudEvent) {
- final PojoCloudEventData<SubscriptionEventResponse> deserializedCloudEvent = CloudEventUtils
- .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, SubscriptionEventResponse.class));
- if (deserializedCloudEvent == null) {
- log.debug("No data found in the consumed subscription response event");
- return null;
- } else {
- final SubscriptionEventResponse subscriptionEventResponse = deserializedCloudEvent.getValue();
- log.debug("Consuming subscription response event {}", subscriptionEventResponse);
- return subscriptionEventResponse;
- }
- }
-}
package org.onap.cps.ncmp.api.impl.utils;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import java.net.URI;
import java.util.UUID;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent;
+import org.springframework.stereotype.Component;
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
+@Component
+@RequiredArgsConstructor
public class SubscriptionOutcomeCloudMapper {
- private static final ObjectMapper objectMapper = new ObjectMapper();
+ private final ObjectMapper objectMapper;
private static String randomId = UUID.randomUUID().toString();
/**
- * Maps SubscriptionEventOutcome to a CloudEvent.
+ * Maps CmSubscriptionNcmpOutEvent to a CloudEvent.
*
- * @param subscriptionEventOutcome object
+ * @param cmSubscriptionNcmpOutEvent object
* @return CloudEvent
*/
- public static CloudEvent toCloudEvent(final SubscriptionEventOutcome subscriptionEventOutcome,
+ public CloudEvent toCloudEvent(final CmSubscriptionNcmpOutEvent cmSubscriptionNcmpOutEvent,
final String eventKey, final String eventType) {
try {
return CloudEventBuilder.v1()
.withSource(URI.create("NCMP"))
.withType(eventType)
.withExtension("correlationid", eventKey)
- .withDataSchema(URI.create("urn:cps:" + SubscriptionEventOutcome.class.getName() + ":1.0.0"))
- .withData(objectMapper.writeValueAsBytes(subscriptionEventOutcome)).build();
- } catch (final Exception ex) {
- throw new CloudEventConstructionException("The Cloud Event could not be constructed", "Invalid object to "
- + "serialize or required headers is missing", ex);
+ .withDataSchema(URI.create("urn:cps:" + CmSubscriptionNcmpOutEvent.class.getName() + ":1.0.0"))
+ .withData(objectMapper.writeValueAsBytes(cmSubscriptionNcmpOutEvent)).build();
+ } catch (final JsonProcessingException jsonProcessingException) {
+ log.error("The Cloud Event could not be constructed", jsonProcessingException);
}
+ return null;
}
}
import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.CloudEventSerializer
import io.cloudevents.kafka.impl.KafkaHeaders
-import io.cloudevents.core.CloudEventUtils
import io.cloudevents.core.builder.CloudEventBuilder
-import io.cloudevents.jackson.PojoCloudEventDataMapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.header.internals.RecordHeaders
import org.testcontainers.spock.Testcontainers
import java.time.Duration
-@SpringBootTest(classes = [EventsPublisher, DataOperationEventConsumer, RecordFilterStrategies,JsonObjectMapper, ObjectMapper])
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
+
+@SpringBootTest(classes = [EventsPublisher, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper])
@Testcontainers
@DirtiesContext
class DataOperationEventConsumerSpec extends MessagingBaseSpec {
@Autowired
RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy
- @Autowired
- ObjectMapper objectMapper
-
def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
def static clientTopic = 'client-topic'
def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
and: 'verify that extension is included into header'
assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
and: 'map consumer record to expected event type'
- def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
- PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
+ def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
and: 'verify published response data properties'
def response = dataOperationResponseEvent.data.responses[0]
response.operationId == 'some-operation-id'
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
-import io.cloudevents.core.CloudEventUtils
import io.cloudevents.core.builder.CloudEventBuilder
-import io.cloudevents.jackson.PojoCloudEventDataMapper
import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.testcontainers.spock.Testcontainers
import java.time.Duration
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
+
@SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
@Testcontainers
@DirtiesContext
and: 'record can be converted to AVC event'
def record = records.iterator().next()
def cloudEvent = record.value() as CloudEvent
- def convertedAvcEvent = CloudEventUtils.mapData(cloudEvent, PojoCloudEventDataMapper.from(new ObjectMapper(), AvcEvent.class)).getValue()
+ def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class)
and: 'we have correct headers forwarded where correlation id matches'
assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
and: 'event id differs(as per requirement) between consumed and forwarded'
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription
+package org.onap.cps.ncmp.api.impl.events.cmsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import org.mapstruct.factory.Mappers
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import spock.lang.Specification
@SpringBootTest(classes = [JsonObjectMapper, ObjectMapper])
-class ClientSubscriptionEventMapperSpec extends Specification {
+class ClientCmSubscriptionNcmpInEventMapperSpec extends Specification {
- ClientSubscriptionEventMapper objectUnderTest = Mappers.getMapper(ClientSubscriptionEventMapper)
+ CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper objectUnderTest = Mappers.getMapper(CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper)
@Autowired
JsonObjectMapper jsonObjectMapper
def 'Map clients subscription event to ncmps subscription event'() {
given: 'a Subscription Event'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
- def testEventToMap = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpInEvent.json')
+ def testEventToMap = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpInEvent.class)
when: 'the client event is mapped to a ncmp subscription event'
- def result = objectUnderTest.toNcmpSubscriptionEvent(testEventToMap)
+ def result = objectUnderTest.toCmSubscriptionDmiInEvent(testEventToMap)
then: 'the resulting ncmp subscription event contains the correct clientId'
assert result.getData().getSubscription().getClientID() == "SCO-9989752"
and: 'subscription name'
and: 'predicate targets is null'
assert result.getData().getPredicates().getTargets() == []
and: 'datastore is passthrough-running'
- assert result.getData().getPredicates().getDatastore() == 'passthrough-running'
+ assert result.getData().getPredicates().getDatastore() == 'ncmp-datastore:passthrough-running'
}
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription
+package org.onap.cps.ncmp.api.impl.events.cmsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import com.hazelcast.map.IMap
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistenceImpl
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.spi.model.DataNodeBuilder
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.boot.test.context.SpringBootTest
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
-class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec {
+class CmSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpec {
@Autowired
JsonObjectMapper jsonObjectMapper
IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
def mockSubscriptionPersistence = Mock(SubscriptionPersistenceImpl)
- def mockSubscriptionEventResponseMapper = Mock(SubscriptionEventResponseMapper)
- def mockSubscriptionEventResponseOutcome = Mock(SubscriptionEventResponseOutcome)
+ def mockSubscriptionEventResponseMapper = Mock(CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper)
+ def mockSubscriptionEventResponseOutcome = Mock(CmSubscriptionNcmpOutEventPublisher)
- def objectUnderTest = new SubscriptionEventResponseConsumer(mockForwardedSubscriptionEventCache,
+ def objectUnderTest = new CmSubscriptionDmiOutEventConsumer(mockForwardedSubscriptionEventCache,
mockSubscriptionPersistence, mockSubscriptionEventResponseMapper, mockSubscriptionEventResponseOutcome)
def 'Consume Subscription Event Response where all DMIs have responded'() {
}
def getSubscriptionResponseEvent() {
- def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
- return jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiOutEvent.json')
+ return jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, CmSubscriptionDmiOutEvent.class)
}
def getCloudEventHavingSubscriptionResponseEvent() {
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription
+package org.onap.cps.ncmp.api.impl.events.cmsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import org.mapstruct.factory.Mappers
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.spi.exceptions.DataValidationException
import org.onap.cps.utils.JsonObjectMapper
@SpringBootTest(classes = [JsonObjectMapper, ObjectMapper])
-class SubscriptionOutcomeMapperSpec extends Specification {
+class CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapperSpec extends Specification {
- SubscriptionOutcomeMapper objectUnderTest = Mappers.getMapper(SubscriptionOutcomeMapper)
+ CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper objectUnderTest = Mappers.getMapper(CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper)
@Autowired
JsonObjectMapper jsonObjectMapper
def 'Map subscription event response to subscription event outcome'() {
given: 'a Subscription Response Event'
- def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
- def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiOutEvent.json')
+ def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, CmSubscriptionDmiOutEvent.class)
when: 'the subscription response event is mapped to a subscription event outcome'
- def result = objectUnderTest.toSubscriptionEventOutcome(subscriptionResponseEvent)
+ def result = objectUnderTest.toCmSubscriptionNcmpOutEvent(subscriptionResponseEvent)
then: 'the resulting subscription event outcome contains expected pending targets per details grouping'
def pendingCmHandleTargetsPerDetails = result.getData().getAdditionalInfo().getPending()
- assert pendingCmHandleTargetsPerDetails.get(0).getDetails() == 'EMS or node connectivity issues, retrying'
- assert pendingCmHandleTargetsPerDetails.get(0).getTargets() == ['CMHandle5', 'CMHandle6','CMHandle7']
+ assert pendingCmHandleTargetsPerDetails.get(0).getDetails() == 'No reply from DMI yet'
+ assert pendingCmHandleTargetsPerDetails.get(0).getTargets() == ['CMHandle3', 'CMHandle4']
and: 'the resulting subscription event outcome contains expected rejected targets per details grouping'
def rejectedCmHandleTargetsPerDetails = result.getData().getAdditionalInfo().getRejected()
- assert rejectedCmHandleTargetsPerDetails.get(0).getDetails() == 'Target(s) do not exist'
- assert rejectedCmHandleTargetsPerDetails.get(0).getTargets() == ['CMHandle4']
- assert rejectedCmHandleTargetsPerDetails.get(1).getDetails() == 'Faulty subscription format for target(s)'
- assert rejectedCmHandleTargetsPerDetails.get(1).getTargets() == ['CMHandle1', 'CMHandle2','CMHandle3']
+ assert rejectedCmHandleTargetsPerDetails.get(0).getDetails() == 'Some other error message from the DMI'
+ assert rejectedCmHandleTargetsPerDetails.get(0).getTargets() == ['CMHandle2']
+ assert rejectedCmHandleTargetsPerDetails.get(1).getDetails() == 'Some error message from the DMI'
+ assert rejectedCmHandleTargetsPerDetails.get(1).getTargets() == ['CMHandle1']
}
def 'Map subscription event response with null of subscription status list to subscription event outcome causes an exception'() {
given: 'a Subscription Response Event'
- def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
- def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiOutEvent.json')
+ def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, CmSubscriptionDmiOutEvent.class)
and: 'set subscription status list to null'
subscriptionResponseEvent.getData().setSubscriptionStatus(subscriptionStatusList)
when: 'the subscription response event is mapped to a subscription event outcome'
- objectUnderTest.toSubscriptionEventOutcome(subscriptionResponseEvent)
+ objectUnderTest.toCmSubscriptionNcmpOutEvent(subscriptionResponseEvent)
then: 'a DataValidationException is thrown with an expected exception details'
def exception = thrown(DataValidationException)
exception.details == 'SubscriptionStatus list cannot be null or empty'
def 'Map subscription event response with subscription status list to subscription event outcome without any exception'() {
given: 'a Subscription Response Event'
- def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
- def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiOutEvent.json')
+ def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, CmSubscriptionDmiOutEvent.class)
when: 'the subscription response event is mapped to a subscription event outcome'
- objectUnderTest.toSubscriptionEventOutcome(subscriptionResponseEvent)
+ objectUnderTest.toCmSubscriptionNcmpOutEvent(subscriptionResponseEvent)
then: 'no exception thrown'
noExceptionThrown()
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription
+package org.onap.cps.ncmp.api.impl.events.cmsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import org.mapstruct.factory.Mappers
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.beans.factory.annotation.Autowired
@SpringBootTest(classes = [JsonObjectMapper, ObjectMapper])
-class SubscriptionEventResponseMapperSpec extends Specification {
+class CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapperSpec extends Specification {
- SubscriptionEventResponseMapper objectUnderTest = Mappers.getMapper(SubscriptionEventResponseMapper)
+ CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper objectUnderTest = Mappers.getMapper(CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper)
@Autowired
JsonObjectMapper jsonObjectMapper
def 'Map subscription response event to yang model subscription event'() {
given: 'a Subscription Response Event'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
- def testEventToMap = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEventResponse.class)
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiOutEvent.json')
+ def testEventToMap = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionDmiOutEvent.class)
when: 'the event is mapped to a yang model subscription'
def result = objectUnderTest.toYangModelSubscriptionEvent(testEventToMap)
then: 'the resulting yang model subscription event contains the correct clientId'
and: 'subscription name'
assert result.subscriptionName == "cm-subscription-001"
and: 'predicate targets cm handle size as expected'
- assert result.predicates.targetCmHandles.size() == 7
+ assert result.predicates.targetCmHandles.size() == 4
and: 'predicate targets cm handle ids as expected'
- assert result.predicates.targetCmHandles.cmHandleId == ["CMHandle1", "CMHandle2", "CMHandle3", "CMHandle4", "CMHandle5", "CMHandle6", "CMHandle7"]
+ assert result.predicates.targetCmHandles.cmHandleId == ["CMHandle1", "CMHandle2", "CMHandle3", "CMHandle4"]
and: 'the status for these targets is set to expected values'
- assert result.predicates.targetCmHandles.status == [SubscriptionStatus.REJECTED, SubscriptionStatus.REJECTED, SubscriptionStatus.REJECTED, SubscriptionStatus.REJECTED, SubscriptionStatus.PENDING, SubscriptionStatus.PENDING, SubscriptionStatus.PENDING]
+ assert result.predicates.targetCmHandles.status == [SubscriptionStatus.REJECTED, SubscriptionStatus.REJECTED, SubscriptionStatus.PENDING, SubscriptionStatus.PENDING]
}
}
\ No newline at end of file
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription
+package org.onap.cps.ncmp.api.impl.events.cmsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
-class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
+class CmSubscriptionNcmpInEventConsumerSpec extends MessagingBaseSpec {
- def mockSubscriptionEventForwarder = Mock(SubscriptionEventForwarder)
- def mockSubscriptionEventMapper = Mock(SubscriptionEventMapper)
+ def mockCmSubscriptionNcmpInEventForwarder = Mock(CmSubscriptionNcmpInEventForwarder)
+ def mockCmSubscriptionNcmpInEventMapper = Mock(CmSubscriptionNcmpInEventMapper)
def mockSubscriptionPersistence = Mock(SubscriptionPersistence)
- def objectUnderTest = new SubscriptionEventConsumer(mockSubscriptionEventForwarder, mockSubscriptionEventMapper, mockSubscriptionPersistence)
+ def objectUnderTest = new CmSubscriptionNcmpInEventConsumer(mockCmSubscriptionNcmpInEventForwarder, mockCmSubscriptionNcmpInEventMapper, mockSubscriptionPersistence)
def yangModelSubscriptionEvent = new YangModelSubscriptionEvent()
def 'Consume, persist and forward valid CM create message'() {
given: 'an event with data category CM'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpInEvent.json')
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpInEvent.class)
testEventSent.getData().getDataType().setDataCategory(dataCategory)
def testCloudEventSent = CloudEventBuilder.v1()
.withData(objectMapper.writeValueAsBytes(testEventSent))
when: 'the valid event is consumed'
objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'the event is mapped to a yangModelSubscription'
- numberOfTimesToPersist * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent) >> yangModelSubscriptionEvent
+ numberOfTimesToPersist * mockCmSubscriptionNcmpInEventMapper.toYangModelSubscriptionEvent(testEventSent) >> yangModelSubscriptionEvent
and: 'the event is persisted'
numberOfTimesToPersist * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent)
and: 'the event is forwarded'
- numberOfTimesToForward * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent, 'subscriptionCreated')
+ numberOfTimesToForward * mockCmSubscriptionNcmpInEventForwarder.forwardCreateSubscriptionEvent(testEventSent, 'subscriptionCreated')
where: 'given values are used'
scenario | dataCategory | dataType | isNotificationEnabled | isModelLoaderEnabled || numberOfTimesToForward || numberOfTimesToPersist
'Both model loader and notification are enabled' | 'CM' | 'subscriptionCreated' | true | true || 1 || 1
def 'Consume event with wrong datastore causes an exception'() {
given: 'an event'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpInEvent.json')
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpInEvent.class)
and: 'datastore is set to a passthrough-running datastore'
testEventSent.getData().getPredicates().setDatastore('operational')
def testCloudEventSent = CloudEventBuilder.v1()
.withType('some-event-type')
.withSource(URI.create('some-resource'))
.withExtension('correlationid', 'test-cmhandle1').build()
- def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+ def consumerRecord = new ConsumerRecord<String, CmSubscriptionNcmpInEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
when: 'the valid event is consumed'
objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'an operation not supported exception is thrown'
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription
+package org.onap.cps.ncmp.api.impl.events.cmsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import com.hazelcast.map.IMap
import io.cloudevents.CloudEvent
-import io.cloudevents.core.CloudEventUtils
-import io.cloudevents.core.data.PojoCloudEventData
-import io.cloudevents.jackson.PojoCloudEventDataMapper
import org.mapstruct.factory.Mappers
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus
+import org.onap.cps.ncmp.api.impl.utils.CmSubscriptionEventCloudMapper
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent.TargetCmHandle
import org.onap.cps.ncmp.api.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.Data
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmHandle
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import spock.util.concurrent.BlockingVariable
-
import java.util.concurrent.TimeUnit
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionEventForwarder])
-class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
- @Autowired
- SubscriptionEventForwarder objectUnderTest
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, CmSubscriptionNcmpInEventForwarder])
+class CmSubscriptionNcmpInEventForwarderSpec extends MessagingBaseSpec {
+ @Autowired
+ CmSubscriptionNcmpInEventForwarder objectUnderTest
@SpringBean
InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
@SpringBean
@SpringBean
IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
@SpringBean
- SubscriptionEventResponseOutcome mockSubscriptionEventResponseOutcome = Mock(SubscriptionEventResponseOutcome)
+ CmSubscriptionEventCloudMapper subscriptionEventCloudMapper = new CmSubscriptionEventCloudMapper(new ObjectMapper())
+ @SpringBean
+ CmSubscriptionNcmpOutEventPublisher mockCmSubscriptionNcmpOutEventPublisher = Mock(CmSubscriptionNcmpOutEventPublisher)
@SpringBean
SubscriptionPersistence mockSubscriptionPersistence = Mock(SubscriptionPersistence)
@SpringBean
- SubscriptionEventMapper subscriptionEventMapper = Mappers.getMapper(SubscriptionEventMapper)
+ CmSubscriptionNcmpInEventMapper cmSubscriptionNcmpInEventMapper = Mappers.getMapper(CmSubscriptionNcmpInEventMapper)
@SpringBean
- ClientSubscriptionEventMapper clientSubscriptionEventMapper = Mappers.getMapper(ClientSubscriptionEventMapper)
+ CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper cmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper = Mappers.getMapper(CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper)
@Autowired
JsonObjectMapper jsonObjectMapper
-
- def objectMapper = new ObjectMapper()
+ @Autowired
+ ObjectMapper objectMapper
def 'Forward valid CM create subscription and simulate timeout'() {
given: 'an event'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpInEvent.json')
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpInEvent.class)
and: 'the InventoryPersistence returns private properties for the supplied CM Handles'
1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> [
- createYangModelCmHandleWithDmiProperty(1, 1,"shape","circle"),
- createYangModelCmHandleWithDmiProperty(2, 1,"shape","square")
+ createYangModelCmHandleWithDmiProperty(1, 1, "shape", "circle"),
+ createYangModelCmHandleWithDmiProperty(2, 1, "shape", "square")
]
and: 'the thread creation delay is reduced to 2 seconds for testing'
objectUnderTest.dmiResponseTimeoutInMs = 2000
and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'
1 * mockSubscriptionEventPublisher.publishCloudEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1",
cloudEvent -> {
- def targets = toSubscriptionEvent(cloudEvent).getData().getPredicates().getTargets()
- def cmHandle2 = createCmHandle('CMHandle2', ['shape':'square'] as Map)
- def cmHandle1 = createCmHandle('CMHandle1', ['shape':'circle'] as Map)
+ def targets = toTargetEvent(cloudEvent, CmSubscriptionDmiInEvent.class).getData().getPredicates().getTargets()
+ def cmHandle2 = createCmHandle('CMHandle2', ['shape': 'square'] as Map)
+ def cmHandle1 = createCmHandle('CMHandle1', ['shape': 'circle'] as Map)
targets == [cmHandle2, cmHandle1]
}
)
and: 'a separate thread has been created where the map is polled'
1 * mockForwardedSubscriptionEventCache.containsKey("SCO-9989752cm-subscription-001") >> true
- 1 * mockSubscriptionEventResponseOutcome.sendResponse(*_)
+ 1 * mockCmSubscriptionNcmpOutEventPublisher.sendResponse(*_)
and: 'the subscription id is removed from the event cache map returning the asynchronous blocking variable'
- 1 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> {block.set(_)}
+ 1 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> { block.set(_) }
}
def 'Forward CM create subscription where target CM Handles are #scenario'() {
given: 'an event'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpInEvent.json')
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpInEvent.class)
and: 'the target CMHandles are set to #scenario'
testEventSent.getData().getPredicates().setTargets(invalidTargets)
when: 'the event is forwarded'
def 'Forward valid CM create subscription where targets are not associated to any existing CMHandles'() {
given: 'an event'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpInEvent.json')
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpInEvent.class)
and: 'a subscription event response'
- def emptySubscriptionEventResponse = new SubscriptionEventResponse().withData(new Data());
+ def emptySubscriptionEventResponse = new CmSubscriptionDmiOutEvent().withData(new Data());
emptySubscriptionEventResponse.getData().setSubscriptionName('cm-subscription-001');
emptySubscriptionEventResponse.getData().setClientId('SCO-9989752');
and: 'the cm handles will be rejected'
def rejectedCmHandles = [new TargetCmHandle('CMHandle1', SubscriptionStatus.REJECTED, 'Cm handle does not exist'),
- new TargetCmHandle('CMHandle2',SubscriptionStatus.REJECTED, 'Cm handle does not exist'),
- new TargetCmHandle('CMHandle3',SubscriptionStatus.REJECTED, 'Cm handle does not exist')]
+ new TargetCmHandle('CMHandle2', SubscriptionStatus.REJECTED, 'Cm handle does not exist'),
+ new TargetCmHandle('CMHandle3', SubscriptionStatus.REJECTED, 'Cm handle does not exist')]
and: 'a yang model subscription event will be saved into the db with rejected cm handles'
- def yangModelSubscriptionEvent = subscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent)
+ def yangModelSubscriptionEvent = cmSubscriptionNcmpInEventMapper.toYangModelSubscriptionEvent(testEventSent)
yangModelSubscriptionEvent.getPredicates().setTargetCmHandles(rejectedCmHandles)
and: 'the InventoryPersistence returns no private properties for the supplied CM Handles'
1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> []
and: 'the event is not being forwarded with the CMHandle private properties and does not provides a valid listenable future'
0 * mockSubscriptionEventPublisher.publishCloudEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1",
cloudEvent -> {
- def targets = toSubscriptionEvent(cloudEvent).getData().getPredicates().getTargets()
- def cmHandle2 = createCmHandle('CMHandle2', ['shape':'square'] as Map)
- def cmHandle1 = createCmHandle('CMHandle1', ['shape':'circle'] as Map)
+ def targets = toTargetEvent(cloudEvent, CmSubscriptionDmiInEvent.class).getData().getPredicates().getTargets()
+ def cmHandle2 = createCmHandle('CMHandle2', ['shape': 'square'] as Map)
+ def cmHandle1 = createCmHandle('CMHandle1', ['shape': 'circle'] as Map)
targets == [cmHandle2, cmHandle1]
}
)
0 * mockSubscriptionEventPublisher.publishCloudEvent("ncmp-dmi-cm-avc-subscription-DMIName2", "SCO-9989752-cm-subscription-001-DMIName2",
cloudEvent -> {
- def targets = toSubscriptionEvent(cloudEvent).getData().getPredicates().getTargets()
- def cmHandle3 = createCmHandle('CMHandle3', ['shape':'triangle'] as Map)
+ def targets = toTargetEvent(cloudEvent, CmSubscriptionDmiInEvent.class).getData().getPredicates().getTargets()
+ def cmHandle3 = createCmHandle('CMHandle3', ['shape': 'triangle'] as Map)
targets == [cmHandle3]
}
)
0 * mockForwardedSubscriptionEventCache.containsKey("SCO-9989752cm-subscription-001") >> true
0 * mockForwardedSubscriptionEventCache.get(_)
and: 'the subscription id is removed from the event cache map returning the asynchronous blocking variable'
- 0 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> {block.set(_)}
+ 0 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> { block.set(_) }
and: 'the persistence service save target cm handles of the yang model subscription event as rejected '
1 * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent)
and: 'subscription outcome has been sent'
- 1 * mockSubscriptionEventResponseOutcome.sendResponse(emptySubscriptionEventResponse, 'subscriptionCreatedStatus')
+ 1 * mockCmSubscriptionNcmpOutEventPublisher.sendResponse(emptySubscriptionEventResponse, 'subscriptionCreatedStatus')
}
- static def createYangModelCmHandleWithDmiProperty(id, dmiId,propertyName, propertyValue) {
- return new YangModelCmHandle(id:"CMHandle" + id, dmiDataServiceName: "DMIName" + dmiId, dmiProperties: [new YangModelCmHandle.Property(propertyName,propertyValue)])
+ static def createYangModelCmHandleWithDmiProperty(id, dmiId, propertyName, propertyValue) {
+ return new YangModelCmHandle(id: "CMHandle" + id, dmiDataServiceName: "DMIName" + dmiId, dmiProperties: [new YangModelCmHandle.Property(propertyName, propertyValue)])
}
static def createCmHandle(id, additionalProperties) {
return cmHandle
}
- def toSubscriptionEvent(cloudEvent) {
- final PojoCloudEventData<org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent> deserializedCloudEvent = CloudEventUtils
- .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper,
- org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent.class));
- if (deserializedCloudEvent == null) {
- return null;
- } else {
- return deserializedCloudEvent.getValue();
- }
- }
-
}
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription
+package org.onap.cps.ncmp.api.impl.events.cmsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import org.mapstruct.factory.Mappers
-import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventMapper
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.beans.factory.annotation.Autowired
@SpringBootTest(classes = [JsonObjectMapper, ObjectMapper])
-class SubscriptionEventMapperSpec extends Specification {
+class CmSubscriptionNcmpInEventMapperSpec extends Specification {
- SubscriptionEventMapper objectUnderTest = Mappers.getMapper(SubscriptionEventMapper)
+ CmSubscriptionNcmpInEventMapper objectUnderTest = Mappers.getMapper(CmSubscriptionNcmpInEventMapper)
@Autowired
JsonObjectMapper jsonObjectMapper
def 'Map subscription event to yang model subscription event where #scenario'() {
given: 'a Subscription Event'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
- def testEventToMap = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpInEvent.json')
+ def testEventToMap = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpInEvent.class)
when: 'the event is mapped to a yang model subscription'
def result = objectUnderTest.toYangModelSubscriptionEvent(testEventToMap)
then: 'the resulting yang model subscription event contains the correct clientId'
def 'Map empty subscription event to yang model subscription event'() {
given: 'a new Subscription Event with no data'
- def testEventToMap = new SubscriptionEvent()
+ def testEventToMap = new CmSubscriptionNcmpInEvent()
when: 'the event is mapped to a yang model subscription'
def result = objectUnderTest.toYangModelSubscriptionEvent(testEventToMap)
then: 'the resulting yang model subscription event contains null clientId'
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avcsubscription
+package org.onap.cps.ncmp.api.impl.events.cmsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence
import org.onap.cps.ncmp.api.impl.utils.DataNodeBaseSpec
import org.onap.cps.ncmp.api.impl.utils.SubscriptionOutcomeCloudMapper
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome;
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionOutcomeMapper, SubscriptionEventResponseOutcome])
-class SubscriptionEventResponseOutcomeSpec extends DataNodeBaseSpec {
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper, CmSubscriptionNcmpOutEventPublisher])
+class CmSubscriptionNcmpOutEventPublisherSpec extends DataNodeBaseSpec {
@Autowired
- SubscriptionEventResponseOutcome objectUnderTest
+ CmSubscriptionNcmpOutEventPublisher objectUnderTest
@SpringBean
SubscriptionPersistence mockSubscriptionPersistence = Mock(SubscriptionPersistence)
@SpringBean
- EventsPublisher<CloudEvent> mockSubscriptionEventOutcomePublisher = Mock(EventsPublisher<CloudEvent>)
+ EventsPublisher<CloudEvent> mockCmSubscriptionNcmpOutEventPublisher = Mock(EventsPublisher<CloudEvent>)
@SpringBean
- SubscriptionOutcomeMapper subscriptionOutcomeMapper = Mappers.getMapper(SubscriptionOutcomeMapper)
+ CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper cmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper = Mappers.getMapper(CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper)
+ @SpringBean
+ SubscriptionOutcomeCloudMapper subscriptionOutcomeCloudMapper = new SubscriptionOutcomeCloudMapper(new ObjectMapper())
@Autowired
JsonObjectMapper jsonObjectMapper
def 'Send response to the client apps successfully'() {
given: 'a subscription response event'
- def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
- def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiOutEvent.json')
+ def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, CmSubscriptionDmiOutEvent.class)
and: 'a subscription outcome event'
- def subscriptionOutcomeJsonData = TestUtils.getResourceFileContent('avcSubscriptionOutcomeEvent2.json')
- def subscriptionOutcomeEvent = jsonObjectMapper.convertJsonString(subscriptionOutcomeJsonData, SubscriptionEventOutcome.class)
+ def subscriptionOutcomeJsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpOutEvent2.json')
+ def subscriptionOutcomeEvent = jsonObjectMapper.convertJsonString(subscriptionOutcomeJsonData, CmSubscriptionNcmpOutEvent.class)
and: 'a random id for the cloud event'
SubscriptionOutcomeCloudMapper.randomId = 'some-id'
and: 'a cloud event containing the outcome event'
.withData(objectMapper.writeValueAsBytes(subscriptionOutcomeEvent))
.withId('some-id')
.withType('subscriptionCreatedStatus')
- .withDataSchema(URI.create('urn:cps:' + 'org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome' + ':1.0.0'))
+ .withDataSchema(URI.create('urn:cps:' + 'org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent' + ':1.0.0'))
.withExtension("correlationid", 'SCO-9989752cm-subscription-001')
.withSource(URI.create('NCMP')).build()
and: 'the persistence service return a data node that includes pending cm handles that makes it partial success'
when: 'the response is being sent'
objectUnderTest.sendResponse(subscriptionResponseEvent, 'subscriptionCreatedStatus')
then: 'the publisher publish the cloud event with itself and expected parameters'
- 1 * mockSubscriptionEventOutcomePublisher.publishCloudEvent('subscription-response', 'SCO-9989752cm-subscription-001', testCloudEventSent)
+ 1 * mockCmSubscriptionNcmpOutEventPublisher.publishCloudEvent('subscription-response', 'SCO-9989752cm-subscription-001', testCloudEventSent)
}
def 'Create subscription outcome message as expected'() {
given: 'a subscription response event'
- def subscriptionResponseJsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
- def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, SubscriptionEventResponse.class)
+ def subscriptionResponseJsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiOutEvent.json')
+ def subscriptionResponseEvent = jsonObjectMapper.convertJsonString(subscriptionResponseJsonData, CmSubscriptionDmiOutEvent.class)
and: 'a subscription outcome event'
- def subscriptionOutcomeJsonData = TestUtils.getResourceFileContent('avcSubscriptionOutcomeEvent.json')
- def subscriptionOutcomeEvent = jsonObjectMapper.convertJsonString(subscriptionOutcomeJsonData, SubscriptionEventOutcome.class)
+ def subscriptionOutcomeJsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpOutEvent.json')
+ def subscriptionOutcomeEvent = jsonObjectMapper.convertJsonString(subscriptionOutcomeJsonData, CmSubscriptionNcmpOutEvent.class)
and: 'a status code and status message a per #scenarios'
subscriptionOutcomeEvent.getData().setStatusCode(statusCode)
subscriptionOutcomeEvent.getData().setStatusMessage(statusMessage)
when: 'a subscription event outcome message is being formed'
- def result = objectUnderTest.fromSubscriptionEventResponse(subscriptionResponseEvent, ncmpEventResponseCode)
+ def result = objectUnderTest.fromDmiOutEvent(subscriptionResponseEvent, ncmpEventResponseCode)
then: 'the result will be equal to event outcome'
result == subscriptionOutcomeEvent
where: 'the following values are used'
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.mapper
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent
+import org.onap.cps.utils.JsonObjectMapper
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+class CloudEventMapperSpec extends Specification {
+
+ @Autowired
+ JsonObjectMapper jsonObjectMapper
+
+ def 'Cloud event to Target event type when it is #scenario'() {
+ expect: 'Events mapped correctly'
+ assert mappedCloudEvent == (CloudEventMapper.toTargetEvent(testCloudEvent(), targetClass) != null)
+ where: 'below are the scenarios'
+ scenario | targetClass || mappedCloudEvent
+ 'valid concrete type' | CmSubscriptionNcmpInEvent.class || true
+ 'invalid concrete type' | ArrayList.class || false
+ }
+
+ def testCloudEvent() {
+ return CloudEventBuilder.v1().withData(jsonObjectMapper.asJsonBytes(new CmSubscriptionNcmpInEvent()))
+ .withId("cmhandle1")
+ .withSource(URI.create('test-source'))
+ .withDataSchema(URI.create('test'))
+ .withType('org.onap.cm.events.cm-subscription')
+ .build()
+ }
+}
package org.onap.cps.ncmp.api.impl.operations
import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.core.CloudEventUtils
-import io.cloudevents.jackson.PojoCloudEventDataMapper
import org.onap.cps.ncmp.api.NcmpEventResponseCode
import org.onap.cps.ncmp.api.impl.config.NcmpConfiguration
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import static org.onap.cps.ncmp.api.impl.operations.OperationType.CREATE
import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ
import static org.onap.cps.ncmp.api.impl.operations.OperationType.UPDATE
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
@SpringBootTest
@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, NcmpConfiguration.DmiProperties, DmiDataOperations])
and: 'a positive response from DMI service when it is called with valid request parameters'
def responseFromDmi = new ResponseEntity<Object>(HttpStatus.ACCEPTED)
def expectedDmiBatchResourceDataUrl = "ncmp/v1/data/topic=my-topic-name"
- def expectedBatchRequestAsJson = '[{"operation":"read","operationId":"operational-14","datastore":"ncmp-datastore:passthrough-operational","options":"some option","resourceIdentifier":"some resource identifier","cmHandles":[{"id":"some-cm-handle","cmHandleProperties":{"prop1":"val1"}}]}]'
+ def expectedBatchRequestAsJson = '{"operations":[{"operation":"read","operationId":"operational-14","datastore":"ncmp-datastore:passthrough-operational","options":"some option","resourceIdentifier":"some resource identifier","cmHandles":[{"id":"some-cm-handle","cmHandleProperties":{"prop1":"val1"}}]}]}'
mockDmiRestClient.postOperationWithJsonData(expectedDmiBatchResourceDataUrl, _, READ.operationName) >> responseFromDmi
dmiServiceUrlBuilder.getDataOperationRequestUrl(_, _) >> expectedDmiBatchResourceDataUrl
when: 'get resource data for group of cm handles are invoked'
}
def extractDataValue(actualDataOperationCloudEvent) {
- return CloudEventUtils.mapData(actualDataOperationCloudEvent, PojoCloudEventDataMapper.from(new ObjectMapper(), DataOperationEvent.class)).getValue().data.responses[0]
+ return toTargetEvent(actualDataOperationCloudEvent, DataOperationEvent.class).data.responses[0]
}
}
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils
+
+import com.fasterxml.jackson.core.JsonProcessingException
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+class CmSubscriptionEventCloudMapperSpec extends Specification {
+
+ @Autowired
+ JsonObjectMapper jsonObjectMapper
+
+ @Autowired
+ ObjectMapper objectMapper
+
+ def spyObjectMapper = Spy(ObjectMapper)
+
+ def objectUnderTest = new CmSubscriptionEventCloudMapper(spyObjectMapper)
+
+ def 'Map the subscription event to data of the cloud event'() {
+ given: 'a subscription event'
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiInEvent.json')
+ def testEventData = jsonObjectMapper.convertJsonString(jsonData,
+ CmSubscriptionDmiInEvent.class)
+ def testCloudEvent = CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(testEventData))
+ .withId('some-id')
+ .withType('subscriptionCreated')
+ .withSource(URI.create('SCO-9989752'))
+ .withExtension('correlationid', 'test-cmhandle1').build()
+ when: 'the subscription event map to data of cloud event'
+ CmSubscriptionEventCloudMapper.randomId = 'some-id'
+ def resultCloudEvent = objectUnderTest.toCloudEvent(testEventData, 'some-event-key', 'subscriptionCreated')
+ then: 'the subscription event resulted having expected values'
+ resultCloudEvent.getData() == testCloudEvent.getData()
+ resultCloudEvent.getId() == testCloudEvent.getId()
+ resultCloudEvent.getType() == testCloudEvent.getType()
+ resultCloudEvent.getSource() == URI.create('SCO-9989752')
+ resultCloudEvent.getDataSchema() == URI.create('urn:cps:org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent:1.0.0')
+ }
+
+ def 'Map the subscription event to cloud event with JSON processing exception'() {
+ given: 'a json processing exception during process'
+ def jsonProcessingException = new JsonProcessingException('The Cloud Event could not be constructed')
+ spyObjectMapper.writeValueAsBytes(_) >> { throw jsonProcessingException }
+ and: 'a subscription event of ncmp version'
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionDmiInEvent.json')
+ def testEventData = jsonObjectMapper.convertJsonString(jsonData,
+ CmSubscriptionDmiInEvent.class)
+ when: 'the subscription event map to cloud event'
+ def expectedResult = objectUnderTest.toCloudEvent(testEventData, 'some-key', 'some-event-type')
+ then: 'no exception is thrown since it has been handled already'
+ noExceptionThrown()
+ and: 'expected result should be null'
+ expectedResult == null
+ }
+
+}
+++ /dev/null
-/*
- * ============LICENSE_START========================================================
- * Copyright (c) 2023 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an 'AS IS' BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.utils
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.core.builder.CloudEventBuilder
-import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent
-import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.utils.JsonObjectMapper
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-import spock.lang.Specification
-
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
-class SubscriptionEventCloudMapperSpec extends Specification {
-
- @Autowired
- JsonObjectMapper jsonObjectMapper
-
- @Autowired
- ObjectMapper objectMapper
-
- def 'Map the data of the cloud event to subscription event'() {
- given: 'a cloud event having a subscription event in the data part'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
- def testEventData = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
- def testCloudEvent = CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(testEventData))
- .withId('some-event-id')
- .withType('subscriptionCreated')
- .withSource(URI.create('some-resource'))
- .withExtension('correlationid', 'test-cmhandle1').build()
- when: 'the cloud event map to subscription event'
- def resultSubscriptionEvent = SubscriptionEventCloudMapper.toSubscriptionEvent(testCloudEvent)
- then: 'the subscription event resulted having expected values'
- resultSubscriptionEvent.getData() == testEventData.getData()
- }
-
- def 'Map the null of the data of the cloud event to subscription event'() {
- given: 'a cloud event having a null subscription event in the data part'
- def testCloudEvent = CloudEventBuilder.v1()
- .withData(null)
- .withId('some-event-id')
- .withType('subscriptionCreated')
- .withSource(URI.create('some-resource'))
- .withExtension('correlationid', 'test-cmhandle1').build()
- when: 'the cloud event map to subscription event'
- def resultSubscriptionEvent = SubscriptionEventCloudMapper.toSubscriptionEvent(testCloudEvent)
- then: 'the subscription event resulted having a null value'
- resultSubscriptionEvent == null
- }
-
- def 'Map the subscription event to data of the cloud event'() {
- given: 'a subscription event'
- def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEventNcmpVersion.json')
- def testEventData = jsonObjectMapper.convertJsonString(jsonData,
- org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent.class)
- def testCloudEvent = CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(testEventData))
- .withId('some-id')
- .withType('subscriptionCreated')
- .withSource(URI.create('SCO-9989752'))
- .withExtension('correlationid', 'test-cmhandle1').build()
- when: 'the subscription event map to data of cloud event'
- SubscriptionEventCloudMapper.randomId = 'some-id'
- def resultCloudEvent = SubscriptionEventCloudMapper.toCloudEvent(testEventData, 'some-event-key', 'subscriptionCreated')
- then: 'the subscription event resulted having expected values'
- resultCloudEvent.getData() == testCloudEvent.getData()
- resultCloudEvent.getId() == testCloudEvent.getId()
- resultCloudEvent.getType() == testCloudEvent.getType()
- resultCloudEvent.getSource() == URI.create('SCO-9989752')
- resultCloudEvent.getDataSchema() == URI.create('urn:cps:org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent:1.0.0')
- }
-
- def 'Map the subscription event to data of the cloud event with wrong content causes an exception'() {
- given: 'an empty ncmp subscription event'
- def testNcmpSubscriptionEvent = new org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent()
- when: 'the subscription event map to data of cloud event'
- SubscriptionEventCloudMapper.toCloudEvent(testNcmpSubscriptionEvent, 'some-key', 'some-event-type')
- then: 'a run time exception is thrown'
- def exception = thrown(CloudEventConstructionException)
- exception.details == 'Invalid object to serialize or required headers is missing'
- }
-
-}
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils
+
+import com.fasterxml.jackson.core.JsonProcessingException
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+class SubscriptionOutcomeCloudMapperSpec extends Specification {
+
+ @Autowired
+ JsonObjectMapper jsonObjectMapper
+
+ @Autowired
+ ObjectMapper objectMapper
+
+ def spyObjectMapper = Spy(ObjectMapper)
+
+ def objectUnderTest = new SubscriptionOutcomeCloudMapper(spyObjectMapper)
+
+ def 'Map the subscription outcome to cloud event'() {
+ given: 'a subscription event'
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpOutEvent.json')
+ def testEventData = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpOutEvent.class)
+ def testCloudEvent = CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(testEventData))
+ .withId('some-id')
+ .withType('subscriptionCreatedStatus')
+ .withSource(URI.create('NCMP'))
+ .withExtension('correlationid', 'test-cmhandle1').build()
+ when: 'the subscription event map to data of cloud event'
+ SubscriptionOutcomeCloudMapper.randomId = 'some-id'
+ def resultCloudEvent = objectUnderTest.toCloudEvent(testEventData, 'some-event-key', 'subscriptionCreatedStatus')
+ then: 'the subscription event resulted having expected values'
+ resultCloudEvent.getData() == testCloudEvent.getData()
+ resultCloudEvent.getId() == testCloudEvent.getId()
+ resultCloudEvent.getType() == testCloudEvent.getType()
+ resultCloudEvent.getSource() == testCloudEvent.getSource()
+ resultCloudEvent.getDataSchema() == URI.create('urn:cps:org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent:1.0.0')
+ }
+
+ def 'Map the subscription outcome to cloud event with JSON processing exception'() {
+ given: 'a json processing exception during process'
+ def jsonProcessingException = new JsonProcessingException('The Cloud Event could not be constructed')
+ spyObjectMapper.writeValueAsBytes(_) >> { throw jsonProcessingException }
+ and: 'a cloud event having a subscription outcome in the data part'
+ def jsonData = TestUtils.getResourceFileContent('cmSubscriptionNcmpOutEvent.json')
+ def testEventData = jsonObjectMapper.convertJsonString(jsonData, CmSubscriptionNcmpOutEvent.class)
+ when: 'the subscription outcome map to cloud event'
+ def expectedResult = objectUnderTest.toCloudEvent(testEventData, 'some-key', 'some-event-type')
+ then: 'no exception is thrown since it has been handled already'
+ noExceptionThrown()
+ and: 'expected result should be null'
+ expectedResult == null
+ }
+
+}
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
-import io.cloudevents.core.CloudEventUtils
-import io.cloudevents.jackson.PojoCloudEventDataMapper
import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.springframework.test.context.ContextConfiguration
import java.time.Duration
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
+
@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper])
class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id'
assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
and: 'map consumer record to expected event type'
- def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
- PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
+ def dataOperationResponseEvent =
+ toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
and: 'data operation response event response size is 3'
dataOperationResponseEvent.data.responses.size() == 3
and: 'verify published response data as json string'
+++ /dev/null
-{
- "data": {
- "clientId": "SCO-9989752",
- "subscriptionName": "cm-subscription-001",
- "dmiName": "dminame1",
- "subscriptionStatus": [
- {
- "id": "CMHandle1",
- "status": "REJECTED",
- "details": "Faulty subscription format for target(s)"
- },
- {
- "id": "CMHandle2",
- "status": "REJECTED",
- "details": "Faulty subscription format for target(s)"
- },
- {
- "id": "CMHandle3",
- "status": "REJECTED",
- "details": "Faulty subscription format for target(s)"
- },
- {
- "id": "CMHandle4",
- "status": "REJECTED",
- "details": "Target(s) do not exist"
- },
- {
- "id": "CMHandle5",
- "status": "PENDING",
- "details": "EMS or node connectivity issues, retrying"
- },
- {
- "id": "CMHandle6",
- "status": "PENDING",
- "details": "EMS or node connectivity issues, retrying"
- },
- {
- "id": "CMHandle7",
- "status": "PENDING",
- "details": "EMS or node connectivity issues, retrying"
- }
- ]
- }
-}
\ No newline at end of file
+++ /dev/null
-{
- "data": {
- "statusCode": 104,
- "statusMessage": "partially applied subscription",
- "additionalInfo": {
- "rejected": [
- {
- "details": "Target(s) do not exist",
- "targets": ["CMHandle4"]
- },
- {
- "details": "Faulty subscription format for target(s)",
- "targets": ["CMHandle1", "CMHandle2", "CMHandle3"]
- }
- ],
- "pending": [
- {
- "details": "EMS or node connectivity issues, retrying",
- "targets": ["CMHandle5", "CMHandle6", "CMHandle7"]
- }
- ]
- }
- }
-}
\ No newline at end of file
--- /dev/null
+{
+ "data": {
+ "clientId": "SCO-9989752",
+ "subscriptionName": "cm-subscription-001",
+ "dmiName": "dminame1",
+ "subscriptionStatus": [
+ {
+ "id": "CMHandle1",
+ "status": "REJECTED",
+ "details": "Some error message from the DMI"
+ },
+ {
+ "id": "CMHandle2",
+ "status": "REJECTED",
+ "details": "Some other error message from the DMI"
+ },
+ {
+ "id": "CMHandle3",
+ "status": "PENDING",
+ "details": "No reply from DMI yet"
+ },
+ {
+ "id": "CMHandle4",
+ "status": "PENDING",
+ "details": "No reply from DMI yet"
+ }
+ ]
+ }
+}
\ No newline at end of file
"CMHandle2",
"CMHandle3"
],
- "datastore": "passthrough-running",
+ "datastore": "ncmp-datastore:passthrough-running",
"datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
}
}
--- /dev/null
+{
+ "data": {
+ "statusCode": 104,
+ "statusMessage": "partially applied subscription",
+ "additionalInfo": {
+ "rejected": [
+ {
+ "details": "Some other error message from the DMI",
+ "targets": ["CMHandle2"]
+ },
+ {
+ "details": "Some error message from the DMI",
+ "targets": ["CMHandle1"]
+ }
+ ],
+ "pending": [
+ {
+ "details": "No reply from DMI yet",
+ "targets": ["CMHandle3", "CMHandle4"]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
final List<Long> anchorIds;
if (paginationOption == NO_PAGINATION) {
- anchorIds = Collections.EMPTY_LIST;
+ anchorIds = Collections.emptyList();
} else {
anchorIds = getAnchorIdsForPagination(dataspaceEntity, cpsPathQuery, paginationOption);
if (anchorIds.isEmpty()) {
.filter(entity -> StringUtils.equals(checksum, (entity.getChecksum())))
.findFirst()
.map(YangResourceEntity::getFileName);
- if (optionalFileName.isPresent()) {
- return optionalFileName.get();
- }
- return "no filename";
+ return optionalFileName.orElse("no filename");
}
private String getDuplicatedChecksumFromException(final ConstraintViolationException exception) {
*/
public Query getQueryForAnchorAndCpsPath(final AnchorEntity anchorEntity, final CpsPathQuery cpsPathQuery) {
return getQueryForDataspaceOrAnchorAndCpsPath(anchorEntity.getDataspace(),
- anchorEntity, cpsPathQuery, Collections.EMPTY_LIST);
+ anchorEntity, cpsPathQuery, Collections.emptyList());
}
/**
import org.onap.cps.ncmp.api.impl.utils.EventDateTimeFormatter;
import org.onap.cps.ncmp.dmi.rest.stub.model.data.operational.CmHandle;
import org.onap.cps.ncmp.dmi.rest.stub.model.data.operational.DataOperationRequest;
-import org.onap.cps.ncmp.dmi.rest.stub.model.data.operational.ResourceDataOperationRequests;
+import org.onap.cps.ncmp.dmi.rest.stub.model.data.operational.DmiDataOperationRequest;
import org.onap.cps.ncmp.dmi.rest.stub.utils.ResourceFileReaderUtil;
import org.onap.cps.ncmp.events.async1_0_0.Data;
import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
/**
* This method is not implemented for ONAP DMI plugin.
*
- * @param topic client given topic name
- * @param requestId requestId generated by NCMP as an ack for client
- * @param resourceDataOperationRequests list of operation details
+ * @param topic client given topic name
+ * @param requestId requestId generated by NCMP as an ack for client
+ * @param dmiDataOperationRequest list of operation details
* @return (@ code ResponseEntity) response entity
*/
@PostMapping("/v1/data")
final String topic,
@RequestParam(value = "requestId")
final String requestId,
- @RequestBody final ResourceDataOperationRequests
- resourceDataOperationRequests) {
- log.info("Request received from the NCMP to DMI Plugin");
- resourceDataOperationRequests.forEach(resourceDataOperationRequest -> {
- final DataOperationEvent dataOperationEvent = getDataOperationEvent(resourceDataOperationRequest);
- resourceDataOperationRequest.getCmHandles().forEach(cmHandle -> {
+ @RequestBody final DmiDataOperationRequest
+ dmiDataOperationRequest) {
+ try {
+ log.info("Request received from the NCMP to DMI Plugin: {}",
+ objectMapper.writeValueAsString(dmiDataOperationRequest));
+ } catch (final JsonProcessingException jsonProcessingException) {
+ log.info("Unable to process dmi data operation request to json string");
+ }
+ dmiDataOperationRequest.getOperations().forEach(dmiDataOperation -> {
+ final DataOperationEvent dataOperationEvent = getDataOperationEvent(dmiDataOperation);
+ dmiDataOperation.getCmHandles().forEach(cmHandle -> {
dataOperationEvent.getData().getResponses().get(0).setIds(List.of(cmHandle.getId()));
final CloudEvent cloudEvent = buildAndGetCloudEvent(topic, requestId, dataOperationEvent);
cloudEventKafkaTemplate.send(ncmpAsyncM2mTopic, UUID.randomUUID().toString(), cloudEvent);
response.setStatusCode(NcmpEventResponseCode.SUCCESS.getStatusCode());
response.setStatusMessage(NcmpEventResponseCode.SUCCESS.getStatusMessage());
response.setIds(dataOperationRequest.getCmHandles().stream().map(CmHandle::getId).collect(Collectors.toList()));
+ response.setResourceIdentifier(dataOperationRequest.getResourceIdentifier());
+ response.setOptions(dataOperationRequest.getOptions());
final String ietfNetworkTopologySample = ResourceFileReaderUtil
.getResourceFileContent(applicationContext.getResource(
ResourceLoader.CLASSPATH_URL_PREFIX
package org.onap.cps.ncmp.dmi.rest.stub.model.data.operational;
+import com.fasterxml.jackson.annotation.JsonInclude;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
+@JsonInclude(JsonInclude.Include.NON_NULL)
@Setter
@Getter
public class DataOperationRequest {
package org.onap.cps.ncmp.dmi.rest.stub.model.data.operational;
-import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import lombok.Setter;
-public class ResourceDataOperationRequests extends ArrayList<DataOperationRequest> {
- private static final long serialVersionUID = 3553323170854399881L;
+@Setter
+@Getter
+public class DmiDataOperationRequest {
+
+ private List<DataOperationRequest> operations;
}
"type": "string"
}
},
+ "resourceIdentifier": {
+ "description": "The format of resource identifier depend on the associated DMI Plugin implementation. For ONAP DMI Plugin it will be RESTConf paths but it can really be anything.",
+ "type": "string"
+ },
+ "options": {
+ "description": "It is mandatory to add as key(s)=value(s)'. The format of options parameter depend on the associated DMI Plugin implementation.",
+ "type": "string"
+ },
"statusCode": {
"description": "which says success or failure (0-99) are for success and (100-199) are for failure",
"type": "string"
stopWatch.stop()
def durationInMillis = stopWatch.getTotalTimeMillis()
then: 'all data is read within expected time'
- recordAndAssertPerformance("Warming database", 100, durationInMillis)
+ recordAndAssertPerformance("Warming database", TimeUnit.SECONDS.toMillis(200), durationInMillis)
}
}
stopWatch.stop()
def deleteDurationInMillis = stopWatch.getTotalTimeMillis()
then: 'delete duration is within expected time'
- recordAndAssertPerformance('Batch delete 100 non-existing', TimeUnit.SECONDS.toMillis(6), deleteDurationInMillis)
+ recordAndAssertPerformance('Batch delete 100 non-existing', TimeUnit.SECONDS.toMillis(7), deleteDurationInMillis)
}
def 'Clean up test data'() {