Merge "Reject create request with duplicated subscriptionId"
authorPriyank Maheshwari <priyank.maheshwari@est.tech>
Wed, 24 Apr 2024 16:22:49 +0000 (16:22 +0000)
committerGerrit Code Review <gerrit@onap.org>
Wed, 24 Apr 2024 16:22:49 +0000 (16:22 +0000)
20 files changed:
cps-application/src/main/resources/application.yml
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpEvent.java [moved from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpCloudEventBuilder.java with 83% similarity]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
docs/cps-ncmp-message-status-codes.rst
docs/release-notes.rst
integration-test/src/test/groovy/org/onap/cps/integration/functional/NcmpCmNotificationSubscriptionSpec.groovy
integration-test/src/test/resources/application.yml

index 3c263e3..27bc6c6 100644 (file)
@@ -52,7 +52,7 @@ spring:
             minimumIdle: 5
             maximumPoolSize: 80
             idleTimeout: 60000
-            connectionTimeout: 120000
+            connectionTimeout: 30000
             leakDetectionThreshold: 30000
             pool-name: CpsDatabasePool
 
@@ -170,7 +170,7 @@ logging:
 ncmp:
     dmi:
         httpclient:
-            connectionTimeoutInSeconds: 180
+            connectionTimeoutInSeconds: 30
             maximumConnectionsPerRoute: 50
             maximumConnectionsTotal: 100
             idleConnectionEvictionThresholdInSeconds: 5
index eca7ebf..1f87865 100644 (file)
@@ -25,11 +25,13 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ;
 
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 import org.onap.cps.ncmp.api.NetworkCmProxyDataService;
 import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException;
 import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
 import org.onap.cps.ncmp.api.impl.operations.OperationType;
+import org.onap.cps.ncmp.api.impl.utils.data.operation.ResourceDataOperationRequestUtils;
 import org.onap.cps.ncmp.api.models.CmResourceAddress;
 import org.onap.cps.ncmp.api.models.DataOperationRequest;
 import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException;
@@ -46,7 +48,7 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
 
     private static final Object noReturn = null;
 
-    private static final int MAXIMUM_CM_HANDLES_PER_OPERATION = 50;
+    private static final int MAXIMUM_CM_HANDLES_PER_OPERATION = 50000;
 
     private static final String PAYLOAD_TOO_LARGE_TEMPLATE = "Operation '%s' affects too many (%d) cm handles";
 
@@ -99,8 +101,9 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
             final DataOperationRequest dataOperationRequest,
             final String authorization) {
         final String requestId = UUID.randomUUID().toString();
-        cpsNcmpTaskExecutor.executeTask(
+        cpsNcmpTaskExecutor.executeTaskWithErrorHandling(
             getTaskSupplierForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId, authorization),
+            getTaskCompletionHandlerForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId),
             timeOutInMilliSeconds);
         return ResponseEntity.ok(Map.of("requestId", requestId));
     }
@@ -139,4 +142,13 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
         };
     }
 
+    private static BiConsumer<Object, Throwable> getTaskCompletionHandlerForDataOperationRequest(
+            final String topicParamInQuery,
+            final DataOperationRequest dataOperationRequest,
+            final String requestId) {
+        return (result, throwable) ->
+                ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(topicParamInQuery,
+                        requestId, dataOperationRequest, throwable);
+    }
+
 }
index ba68d5b..2601c7a 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2022-2023 Nordix Foundation
+ *  Copyright (C) 2022-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ package org.onap.cps.ncmp.rest.executor;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -31,16 +32,30 @@ import org.springframework.stereotype.Service;
 @Service
 public class CpsNcmpTaskExecutor {
 
+    /**
+     * Execute a task asynchronously, and invoke completion handler when done.
+     *
+     * @param taskSupplier functional method is get() task needed to be executed asynchronously
+     * @param taskCompletionHandler the action to perform on task completion or error
+     * @param timeOutInMillis the time-out value in milliseconds
+     */
+    public void executeTaskWithErrorHandling(final Supplier<Object> taskSupplier,
+                                             final BiConsumer<Object, Throwable> taskCompletionHandler,
+                                             final long timeOutInMillis) {
+        CompletableFuture.supplyAsync(taskSupplier)
+                .orTimeout(timeOutInMillis, MILLISECONDS)
+                .whenCompleteAsync(taskCompletionHandler);
+    }
+
     /**
      * Execute a task asynchronously.
      *
-     * @param taskSupplier functional method is get() task need to executed asynchronously
+     * @param taskSupplier functional method is get() task needed to be executed asynchronously
      * @param timeOutInMillis the time-out value in milliseconds
      */
     public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
-        CompletableFuture.supplyAsync(taskSupplier::get)
-            .orTimeout(timeOutInMillis, MILLISECONDS)
-            .whenCompleteAsync((taskResult, throwable) -> handleTaskCompletion(throwable));
+        executeTaskWithErrorHandling(taskSupplier, (taskResult, throwable) -> handleTaskCompletion(throwable),
+                timeOutInMillis);
     }
 
     private void handleTaskCompletion(final Throwable throwable) {
index a5b1f05..2d7e9b2 100644 (file)
@@ -198,7 +198,7 @@ class NetworkCmProxyControllerSpec extends Specification {
         and: 'async request id is generated'
             assert response.contentAsString.contains('requestId')
         then: 'the request is handled asynchronously'
-            1 * mockCpsTaskExecutor.executeTask(*_)
+            1 * mockCpsTaskExecutor.executeTaskWithErrorHandling(*_)
         where: 'the following data stores are used'
             datastore << [PASSTHROUGH_RUNNING, PASSTHROUGH_OPERATIONAL]
     }
index aef37c9..641715d 100644 (file)
@@ -79,7 +79,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
         when: 'data operation request is executed'
             objectUnderTest.executeRequest('someTopic', new DataOperationRequest(), NO_AUTH_HEADER)
         then: 'the task is executed in an async fashion or not'
-            expectedCalls * spiedCpsNcmpTaskExecutor.executeTask(*_)
+            expectedCalls * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_)
         where: 'the following parameters are used'
             scenario | notificationFeatureEnabled || expectedCalls
             'on'     | true                       || 1
@@ -101,7 +101,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
         when: 'data operation request is executed'
             objectUnderTest.executeRequest('myTopic', dataOperationRequest, NO_AUTH_HEADER)
         then: 'the task is executed in an async fashion'
-            1 * spiedCpsNcmpTaskExecutor.executeTask(*_)
+            1 * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_)
         and: 'the network service is invoked'
             new PollingConditions().within(1) {
                 assert networkServiceMethodCalled == true
@@ -143,14 +143,15 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
 
     def 'Attempt to execute async data operation request with too many cm handles.'() {
         given: 'a data operation definition with too many cm handles'
-            def cmHandleIds = new String[51]
+            def tooMany = objectUnderTest.MAXIMUM_CM_HANDLES_PER_OPERATION+1
+            def cmHandleIds = new String[tooMany]
             def dataOperationDefinition = new DataOperationDefinition(operationId: 'abc', operation: 'read', datastore: 'ncmp-datastore:passthrough-running', cmHandleIds: cmHandleIds)
         when: 'data operation request is executed'
             objectUnderTest.executeRequest('someTopic', new DataOperationRequest(dataOperationDefinitions:[dataOperationDefinition]), NO_AUTH_HEADER)
         then: 'a payload too large exception is thrown'
             def exceptionThrown = thrown(PayloadTooLargeException)
         and: 'the error message contains the offending number of cm handles'
-            assert exceptionThrown.message == "Operation 'abc' affects too many (51) cm handles"
+            assert exceptionThrown.message == "Operation 'abc' affects too many (${tooMany}) cm handles"
     }
 
 }
index 010eda9..4c8c40f 100644 (file)
@@ -33,6 +33,7 @@ class CpsNcmpTaskExecutorSpec extends Specification {
     def objectUnderTest = new CpsNcmpTaskExecutor()
     def logger = Spy(ListAppender<ILoggingEvent>)
     def enoughTime = 100
+    def notEnoughTime = 10
 
     void setup() {
         ((Logger) LoggerFactory.getLogger(CpsNcmpTaskExecutor.class)).addAppender(logger)
@@ -67,6 +68,18 @@ class CpsNcmpTaskExecutorSpec extends Specification {
             assert loggingEvent.formattedMessage.contains('original exception message')
     }
 
+    def 'Task times out.'() {
+        when: 'task is executed without enough time to complete'
+            objectUnderTest.executeTask(taskSupplierForLongRunningTask(), notEnoughTime)
+        then: 'an event is logged with level ERROR'
+            new PollingConditions().within(1) {
+                def loggingEvent = getLoggingEvent()
+                assert loggingEvent.level == Level.ERROR
+            }
+        and: 'a timeout error message is logged'
+            assert loggingEvent.formattedMessage.contains('java.util.concurrent.TimeoutException')
+    }
+
     def taskSupplier() {
         return () -> 'hello world'
     }
@@ -75,6 +88,10 @@ class CpsNcmpTaskExecutorSpec extends Specification {
         return () -> { throw new RuntimeException('original exception message') }
     }
 
+    def taskSupplierForLongRunningTask() {
+        return () -> { sleep(enoughTime) }
+    }
+
     def getLoggingEvent() {
         return logger.list[0]
     }
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2023 Nordix Foundation
+ *  Copyright (C) 2023-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -31,30 +31,32 @@ import org.onap.cps.ncmp.api.impl.utils.EventDateTimeFormatter;
 import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext;
 import org.onap.cps.utils.JsonObjectMapper;
 
-@Builder(buildMethodName = "setCloudEvent")
-public class NcmpCloudEventBuilder {
+@Builder
+public class NcmpEvent {
 
-    private Object event;
+    private Object data;
     private Map<String, String> extensions;
     private String type;
     @Builder.Default
-    private static final String EVENT_SPEC_VERSION_V1 = "1.0.0";
+    private static final String CLOUD_EVENT_SPEC_VERSION_V1 = "1.0.0";
+    @Builder.Default
+    private static final String CLOUD_EVENT_SOURCE = "NCMP";
 
     /**
      * Creates ncmp cloud event with provided attributes.
      *
      * @return Cloud Event
      */
-    public CloudEvent build() {
+    public CloudEvent asCloudEvent() {
         final JsonObjectMapper jsonObjectMapper = CpsApplicationContext.getCpsBean(JsonObjectMapper.class);
         final CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1()
                 .withId(UUID.randomUUID().toString())
-                .withSource(URI.create("NCMP"))
+                .withSource(URI.create(CLOUD_EVENT_SOURCE))
                 .withType(type)
-                .withDataSchema(URI.create("urn:cps:" + type + ":" + EVENT_SPEC_VERSION_V1))
+                .withDataSchema(URI.create("urn:cps:" + type + ":" + CLOUD_EVENT_SPEC_VERSION_V1))
                 .withTime(EventDateTimeFormatter.toIsoOffsetDateTime(
                         EventDateTimeFormatter.getCurrentIsoFormattedDateTime()))
-                .withData(jsonObjectMapper.asJsonBytes(event));
+                .withData(jsonObjectMapper.asJsonBytes(data));
         extensions.entrySet().stream()
                 .filter(extensionEntry -> StringUtils.isNotBlank(extensionEntry.getValue()))
                 .forEach(extensionEntry ->
index 9bd1119..7afe606 100644 (file)
@@ -26,7 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import org.onap.cps.events.EventsPublisher;
-import org.onap.cps.ncmp.api.impl.events.NcmpCloudEventBuilder;
+import org.onap.cps.ncmp.api.impl.events.NcmpEvent;
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc;
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent;
 import org.onap.cps.ncmp.events.avc.ncmp_to_client.Data;
@@ -53,8 +53,8 @@ public class AvcEventPublisher {
 
         final Map<String, String> extensions = createAvcEventExtensions(eventKey);
         final CloudEvent avcCloudEvent =
-            NcmpCloudEventBuilder.builder().type(AvcEvent.class.getTypeName())
-            .event(avcEvent).extensions(extensions).setCloudEvent().build();
+            NcmpEvent.builder().type(AvcEvent.class.getTypeName())
+            .data(avcEvent).extensions(extensions).build().asCloudEvent();
 
         eventsPublisher.publishCloudEvent(avcTopic, eventKey, avcCloudEvent);
     }
index 8c1cac3..4b3a085 100644 (file)
@@ -119,8 +119,8 @@ public class DmiCmNotificationSubscriptionCacheHandler {
 
             for (final String cmHandle: cmHandles) {
                 for (final String xpath: xpaths) {
-                    cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,
-                        cmHandle, xpath, subscriptionId);
+                    cmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(datastoreType, cmHandle,
+                            xpath, subscriptionId);
                 }
             }
         }
index 6b02adb..3bb40c3 100644 (file)
@@ -31,9 +31,9 @@ public interface CmNotificationSubscriptionPersistenceService {
     /**
      * Check if we have an ongoing cm subscription based on the parameters.
      *
-     * @param datastoreType valid datastore type
-     * @param cmHandleId    cmhandle id
-     * @param xpath         valid xpath
+     * @param datastoreType the susbcription target datastore type
+     * @param cmHandleId the id of the cm handle for the susbcription
+     * @param xpath the target xpath
      * @return true for ongoing cmsubscription , otherwise false
      */
     boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
@@ -50,22 +50,35 @@ public interface CmNotificationSubscriptionPersistenceService {
     /**
      * Get all ongoing cm notification subscription based on the parameters.
      *
-     * @param datastoreType valid datastore type
-     * @param cmHandleId    cmhandle id
-     * @param xpath         valid xpath
+     * @param datastoreType the susbcription target datastore type
+     * @param cmHandleId the id of the cm handle for the susbcription
+     * @param xpath the target xpath
      * @return collection of subscription ids of ongoing cm notification subscription
      */
     Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType,
             final String cmHandleId, final String xpath);
 
     /**
-     * Add or update cm notification subscription.
+     * Add cm notification subscription.
      *
-     * @param datastoreType valid datastore type
-     * @param cmHandle cmhandle id
-     * @param xpath valid xpath
-     * @param newSubscriptionId subscription Id to be added
+     * @param datastoreType the susbcription target datastore type
+     * @param cmHandleId the id of the cm handle for the susbcription
+     * @param xpath the target xpath
+     * @param newSubscriptionId subscription id to be added
      */
-    void addOrUpdateCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandle,
-                                               final String xpath, final String newSubscriptionId);
+    void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
+                                       final String xpath, final String newSubscriptionId);
+
+    /**
+     * Remove cm notification Subscription.
+     *
+     * @param datastoreType the susbcription target datastore type
+     * @param cmHandleId the id of the cm handle for the susbcription
+     * @param xpath the target xpath
+     * @param subscriptionId subscription id to remove
+     */
+    void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
+                                          final String xpath, final String subscriptionId);
+
 }
+
index 2efd321..92f3459 100644 (file)
@@ -24,7 +24,6 @@ import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS;
 
 import java.io.Serializable;
 import java.time.OffsetDateTime;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -60,7 +59,7 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
 
     @Override
     public boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
-            final String xpath) {
+                                                       final String xpath) {
         return !getOngoingCmNotificationSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty();
     }
 
@@ -73,7 +72,7 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
 
     @Override
     public Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType,
-            final String cmHandleId, final String xpath) {
+                                                                      final String cmHandleId, final String xpath) {
 
         final String isOngoingCmSubscriptionCpsPathQuery =
                 CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
@@ -88,45 +87,77 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
     }
 
     @Override
-    public void addOrUpdateCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
-                                                      final String xpath, final String newSubscriptionId) {
-        if (isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)) {
-            final DataNode existingFilterNode =
-                    cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
-                            CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
-                                    escapeQuotesByDoublingThem(xpath)),
-                            OMIT_DESCENDANTS).iterator().next();
-            final Collection<String> existingSubscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType,
+    public void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
+                                              final String xpath, final String subscriptionId) {
+        if (isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)
+                && (!getOngoingCmNotificationSubscriptionIds(datastoreType, cmHandleId, xpath)
+                .contains(subscriptionId))) {
+            final DataNode subscriptionAsDataNode = getSubscriptionAsDataNode(datastoreType, cmHandleId, xpath);
+            final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType,
                     cmHandleId, xpath);
-            if (!existingSubscriptionIds.contains(newSubscriptionId)) {
-                updateListOfSubscribers(existingSubscriptionIds, newSubscriptionId, existingFilterNode);
-            }
+            subscriptionIds.add(subscriptionId);
+            saveSubscriptionDetails(subscriptionAsDataNode, subscriptionIds);
+        } else {
+            addNewSubscriptionViaDatastore(datastoreType, cmHandleId, xpath, subscriptionId);
+        }
+    }
+
+    @Override
+    public void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
+                                                 final String xpath, final String subscriptionId) {
+        final DataNode subscriptionAsDataNode = getSubscriptionAsDataNode(datastoreType, cmHandleId, xpath);
+        final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType,
+                cmHandleId, xpath);
+        subscriptionIds.remove(subscriptionId);
+        saveSubscriptionDetails(subscriptionAsDataNode, subscriptionIds);
+        if (isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)) {
+            log.info("There are subscribers left for the following cps path {} :",
+                    CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
+                            escapeQuotesByDoublingThem(xpath)));
         } else {
-            addNewSubscriptionViaDatastore(datastoreType, cmHandleId, xpath, newSubscriptionId);
+            log.info("No subscribers left for the following cps path {} :",
+                    CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
+                            escapeQuotesByDoublingThem(xpath)));
+            deleteListOfSubscriptionsFor(datastoreType, cmHandleId, xpath);
         }
     }
 
+    private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId,
+                                              final String xpath) {
+        cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+                CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
+                        escapeQuotesByDoublingThem(xpath)),
+                OffsetDateTime.now());
+    }
+
+    private DataNode getSubscriptionAsDataNode(final DatastoreType datastoreType, final String cmHandleId,
+                                               final String xpath) {
+        return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+                CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
+                        escapeQuotesByDoublingThem(xpath)),
+                OMIT_DESCENDANTS).iterator().next();
+    }
+
     private void addNewSubscriptionViaDatastore(final DatastoreType datastoreType, final String cmHandleId,
                                                 final String xpath, final String newSubscriptionId) {
         final String parentXpath = "/datastores/datastore[@name='%s']/cm-handles"
                 .formatted(datastoreType.getDatastoreName());
-        final String updatedJson = String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":{\"filter\":"
+        final String subscriptionAsJson = String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":{\"filter\":"
                 + "[{\"xpath\":\"%s\",\"subscriptionIds\":[\"%s\"]}]}}]}", cmHandleId, xpath, newSubscriptionId);
-        cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, updatedJson,
+        cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, subscriptionAsJson,
                 OffsetDateTime.now(), ContentType.JSON);
     }
 
-    private void updateListOfSubscribers(final Collection<String> existingSubscriptionIds,
-                                         final String newSubscriptionId, final DataNode existingFilterNode) {
-        final String parentXpath = CpsPathUtil.getNormalizedParentXpath(existingFilterNode.getXpath());
-        final List<String> updatedSubscribers = new ArrayList<>(existingSubscriptionIds);
-        updatedSubscribers.add(newSubscriptionId);
-        final Map<String, Serializable> updatedLeaves = new HashMap<>();
-        updatedLeaves.put("xpath", existingFilterNode.getLeaves().get("xpath"));
-        updatedLeaves.put("subscriptionIds", (Serializable) updatedSubscribers);
-        final String updatedJson = "{\"filter\":[" + jsonObjectMapper.asJsonString(updatedLeaves) + "]}";
-        cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, updatedJson,
-                OffsetDateTime.now());
+    private void saveSubscriptionDetails(final DataNode subscriptionDetailsAsDataNode,
+                                         final  Collection<String> subscriptionIds) {
+        final Map<String, Serializable> subscriptionDetailsAsMap = new HashMap<>();
+        subscriptionDetailsAsMap.put("xpath", subscriptionDetailsAsDataNode.getLeaves().get("xpath"));
+        subscriptionDetailsAsMap.put("subscriptionIds", (Serializable) subscriptionIds);
+        final String parentXpath = CpsPathUtil.getNormalizedParentXpath(subscriptionDetailsAsDataNode.getXpath());
+        final String subscriptionDetailsAsJson = "{\"filter\":["
+                + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap).replace("'", "\"") + "]}";
+        cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath,
+                subscriptionDetailsAsJson, OffsetDateTime.now());
     }
 
     private static String escapeQuotesByDoublingThem(final String inputXpath) {
index 61da706..42bad89 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2023 Nordix Foundation
+ *  Copyright (C) 2023-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.NcmpResponseStatus;
-import org.onap.cps.ncmp.api.impl.events.NcmpCloudEventBuilder;
+import org.onap.cps.ncmp.api.impl.events.NcmpEvent;
 import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation;
 import org.onap.cps.ncmp.events.async1_0_0.Data;
 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
@@ -57,8 +57,8 @@ public class DataOperationEventCreator {
         final Data data = createPayloadFromDataOperationResponses(cmHandleIdsPerResponseCodesPerOperation);
         dataOperationEvent.setData(data);
         final Map<String, String> extensions = createDataOperationExtensions(requestId, clientTopic);
-        return NcmpCloudEventBuilder.builder().type(DataOperationEvent.class.getName())
-                .event(dataOperationEvent).extensions(extensions).setCloudEvent().build();
+        return NcmpEvent.builder().type(DataOperationEvent.class.getName())
+                .data(dataOperationEvent).extensions(extensions).build().asCloudEvent();
     }
 
     private static Data createPayloadFromDataOperationResponses(final MultiValueMap<DmiDataOperation,
index a8b4e28..4b016b3 100644 (file)
@@ -31,6 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -109,28 +110,80 @@ public class ResourceDataOperationRequestUtils {
                     DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
                     CM_HANDLES_NOT_READY, nonReadyCmHandleIds);
         }
-        if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
-            publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
-        }
+        publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
         return dmiDataOperationsOutPerDmiServiceName;
     }
 
+    /**
+     * Handles the async task completion for an entire data, publishing errors to client topic on task failure.
+     *
+     * @param topicParamInQuery      client given topic
+     * @param requestId              unique identifier per request
+     * @param dataOperationRequest   incoming data operation request details
+     * @param throwable              error cause, or null if task completed with no exception
+     */
+    public static void handleAsyncTaskCompletionForDataOperationsRequest(
+            final String topicParamInQuery,
+            final String requestId,
+            final DataOperationRequest dataOperationRequest,
+            final Throwable throwable) {
+        if (throwable == null) {
+            log.info("Data operations request {} completed.", requestId);
+        } else if (throwable instanceof TimeoutException) {
+            log.error("Data operations request {} timed out.", requestId);
+            ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
+                    requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING);
+        } else {
+            log.error("Data operations request {} failed.", requestId, throwable);
+            ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
+                    requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR);
+        }
+    }
+
+    /**
+     * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic.
+     *
+     * @param topicParamInQuery      client given topic
+     * @param requestId              unique identifier per request
+     * @param dataOperationRequestIn incoming data operation request details
+     * @param ncmpResponseStatus     response code to be sent for all cm handle ids in all operations
+     */
+    private static void publishErrorMessageToClientTopicForEntireOperation(
+            final String topicParamInQuery,
+            final String requestId,
+            final DataOperationRequest dataOperationRequestIn,
+            final NcmpResponseStatus ncmpResponseStatus) {
+
+        final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>>
+                cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>();
+
+        for (final DataOperationDefinition dataOperationDefinitionIn :
+                dataOperationRequestIn.getDataOperationDefinitions()) {
+            cmHandleIdsPerResponseCodesPerOperation.add(
+                    DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
+                    Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds()));
+        }
+        publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
+    }
+
     /**
      * Creates data operation cloud event and publish it to client topic.
      *
      * @param clientTopic                              client given topic
      * @param requestId                                unique identifier per request
-     * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code
+     * @param cmHandleIdsPerResponseCodesPerOperation  list of cm handle ids per operation with response code
      */
     public static void publishErrorMessageToClientTopic(final String clientTopic,
                                                          final String requestId,
                                                          final MultiValueMap<DmiDataOperation,
                                                                  Map<NcmpResponseStatus, List<String>>>
                                                                     cmHandleIdsPerResponseCodesPerOperation) {
-        final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
-                requestId, cmHandleIdsPerResponseCodesPerOperation);
-        final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
-        eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+        if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
+            final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
+                    requestId, cmHandleIdsPerResponseCodesPerOperation);
+            final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
+            eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+        }
     }
 
     private static Map<String, String> getDmiServiceNamesPerCmHandleId(
index 47a1c89..10e060f 100644 (file)
@@ -139,7 +139,7 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
         when: 'subscription is persisted in database'
             objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1')
         then: 'persistence service is called the correct number of times per dmi'
-            4 * mockCmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(_,_,_,subscriptionId)
+            4 * mockCmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(_,_,_,subscriptionId)
     }
 
     def setUpTestEvent(){
index 19ebc3d..13a20a1 100644 (file)
@@ -71,12 +71,12 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
 
     def 'Add new subscriber to an ongoing cm notification subscription'() {
         given: 'a valid cm subscription path query'
-            def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y');
+            def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
         and: 'a dataNode exists for the given cps path query'
              mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
                 cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': ['sub-1']])]
         when: 'the method to add/update cm notification subscription is called'
-            objectUnderTest.addOrUpdateCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1','/x/y', 'newSubId')
+            objectUnderTest.addCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1','/x/y', 'newSubId')
         then: 'data service method to update list of subscribers is called once'
             1 * mockCpsDataService.updateNodeLeaves(
                 'NCMP-Admin',
@@ -95,7 +95,7 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
                 cpsPathQuery.formatted(datastoreName),
                 FetchDescendantsOption.OMIT_DESCENDANTS) >> []
         when: 'the method to add/update cm notification subscription is called'
-            objectUnderTest.addOrUpdateCmNotificationSubscription(datastoreType, 'ch-1','/x/y', 'newSubId')
+            objectUnderTest.addCmNotificationSubscription(datastoreType, 'ch-1','/x/y', 'newSubId')
         then: 'data service method to update list of subscribers is called once with the correct parameters'
             1 * mockCpsDataService.saveData(
                 'NCMP-Admin',
@@ -107,4 +107,30 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
             'passthrough_running'     | DatastoreType.PASSTHROUGH_RUNNING      || "ncmp-datastore:passthrough-running"
             'passthrough_operational' | DatastoreType.PASSTHROUGH_OPERATIONAL  || "ncmp-datastore:passthrough-operational"
     }
+
+    def 'Remove subscriber from a list of an ongoing cm notification subscription'() {
+        given: 'a subscription exists when queried'
+            def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
+            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
+                cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': ['sub-1', 'sub-2']])]
+        when: 'the subscriber is removed'
+            objectUnderTest.removeCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
+        then: 'the list of subscribers is updated'
+            1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-subscriptions',
+                '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters',
+                '{"filter":[{"xpath":"/x/y","subscriptionIds":["sub-2"]}]}', _)
+    }
+
+    def 'Removing ongoing subscription with no subscribers'(){
+        given: 'a subscription exists when queried but has no subscribers'
+            def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
+            mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
+                cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': []])]
+        when: 'a an ongoing subscription is refreshed'
+            objectUnderTest.removeCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
+        then: 'the subscription with empty subscriber list is removed'
+            1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-subscriptions',
+                '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters/filter[@xpath=\'/x/y\']',
+                _)
+    }
 }
\ No newline at end of file
index 5690b8f..8df27bb 100644 (file)
@@ -26,6 +26,7 @@ import io.cloudevents.kafka.CloudEventDeserializer
 import io.cloudevents.kafka.impl.KafkaHeaders
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.cps.events.EventsPublisher
+import org.onap.cps.ncmp.api.NcmpResponseStatus
 import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
 import org.onap.cps.ncmp.api.impl.inventory.CmHandleState
@@ -38,14 +39,15 @@ import org.onap.cps.utils.JsonObjectMapper
 import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.test.context.ContextConfiguration
+
 import java.time.Duration
+import java.util.concurrent.TimeoutException
 
 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
 
 @ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper])
 class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
 
-    def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
     def static clientTopic = 'my-topic-name'
     def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
 
@@ -90,6 +92,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
 
     def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() {
         given: 'consumer subscribing to client topic'
+            def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer))
             cloudEventKafkaConsumer.subscribe([clientTopic])
         and: 'data operation request having non-ready and non-existing cm handle ids'
             def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
@@ -97,7 +100,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
         when: 'data operation request is processed'
             ResourceDataOperationRequestUtils.processPerDefinitionInDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, yangModelCmHandles)
         and: 'subscribed client specified topic is polled and first record is selected'
-            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
         then: 'verify cloud compliant headers'
             def consumerRecordOutHeaders = consumerRecordOut.headers()
             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') != null
@@ -111,10 +114,34 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
         and: 'data operation response event response size is 3'
             dataOperationResponseEvent.data.responses.size() == 3
         and: 'verify published data operation response as json string'
-        def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
+            def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
             jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
     }
 
+    def 'Publish error response for entire data operations request when async task fails'() {
+        given: 'consumer subscribing to client topic'
+            def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer))
+            cloudEventKafkaConsumer.subscribe([clientTopic])
+        and: 'data operation request having non-ready and non-existing cm handle ids'
+            def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
+            def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
+        when: 'an error occurs for the entire data operations request'
+            ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown)
+        and: 'subscribed client specified topic is polled and first record is selected'
+            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
+            def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
+        then: 'data operation response event response size is 3'
+            dataOperationResponseEvent.data.responses.size() == 3
+        and: 'all 3 have the expected error code'
+            dataOperationResponseEvent.data.responses.each {
+                assert it.statusCode == errorReportedToClientTopic.code
+            }
+        where:
+            scenario             | exceptionThrown        | consumerGroupId || errorReportedToClientTopic
+            'task timed out'     | new TimeoutException() | 'test-2'        || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING
+            'unspecified error'  | new RuntimeException() | 'test-3'        || NcmpResponseStatus.UNKNOWN_ERROR
+    }
+
     static def getYangModelCmHandles() {
         def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
         def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()
index 018cf4a..0c6ce0e 100644 (file)
@@ -16,7 +16,7 @@ CPS-NCMP Message Status Codes
     +-----------------+------------------------------------------------------+-----------------------------------+
     | 1               | successfully applied subscription                    | CM Data Notification Subscription |
     +-----------------+------------------------------------------------------+-----------------------------------+
-    | 100             | cm handle id(s) is(are) not found                    | Data Operation, Inventory         |
+    | 100             | cm handle id(s) is(are) not found                    | All features                      |
     +-----------------+------------------------------------------------------+-----------------------------------+
     | 101             | cm handle(s) not ready                               | Data Operation                    |
     +-----------------+------------------------------------------------------+-----------------------------------+
@@ -32,7 +32,7 @@ CPS-NCMP Message Status Codes
     +-----------------+------------------------------------------------------+-----------------------------------+
     | 107             | southbound system is busy                            | Data Operation                    |
     +-----------------+------------------------------------------------------+-----------------------------------+
-    | 108             | Unknown error                                        | Inventory                         |
+    | 108             | Unknown error                                        | All features                      |
     +-----------------+------------------------------------------------------+-----------------------------------+
     | 109             | cm-handle already exists                             | Inventory                         |
     +-----------------+------------------------------------------------------+-----------------------------------+
index d35ed99..f04f977 100644 (file)
@@ -36,6 +36,11 @@ Release Data
 |                                      |                                                        |
 +--------------------------------------+--------------------------------------------------------+
 
+Bug Fixes
+---------
+3.4.8
+    - `CPS-2186 <https://jira.onap.org/browse/CPS-2186>`_ Report async task failures to client topic during data operations request
+
 Features
 --------
 
index df74a05..9129f09 100644 (file)
@@ -18,7 +18,7 @@ class NcmpCmNotificationSubscriptionSpec extends CpsIntegrationSpecBase {
             assert cmNotificationSubscriptionPersistenceService.
                 getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size() == 0
         when: 'we add a new cm notification subscription'
-            cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,cmHandleId,xpath,
+            cmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(datastoreType,cmHandleId,xpath,
                 'subId-1')
         then: 'there is an ongoing cm subscription for that CM handle and xpath'
             assert cmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(datastoreType,cmHandleId,xpath)
@@ -27,15 +27,13 @@ class NcmpCmNotificationSubscriptionSpec extends CpsIntegrationSpecBase {
                 getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size() == 1
     }
 
-    def 'Adding a cm notification subscription to an already existing'() {
-        given: 'an ongoing cm subscription'
+    def 'Adding a cm notification subscription to the already existing'() {
+        given: 'an ongoing cm subscription with the following details'
             def datastoreType = PASSTHROUGH_RUNNING
             def cmHandleId = 'ch-1'
             def xpath = '/x/y'
-            cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,cmHandleId,xpath,
-                'subId-1')
         when: 'a new cm notification subscription is made for the SAME CM handle and xpath'
-            cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,cmHandleId,xpath,
+            cmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(datastoreType,cmHandleId,xpath,
                 'subId-2')
         then: 'it is added to the ongoing list of subscription ids'
             def subscriptionIds = cmNotificationSubscriptionPersistenceService.getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath)
@@ -43,4 +41,35 @@ class NcmpCmNotificationSubscriptionSpec extends CpsIntegrationSpecBase {
         and: 'both subscription ids exists for the CM handle and xpath'
             assert subscriptionIds.contains("subId-1") && subscriptionIds.contains("subId-2")
     }
+
+    def 'Removing cm notification subscriber among other subscribers'() {
+        given: 'an ongoing cm subscription with the following details'
+            def datastoreType = PASSTHROUGH_RUNNING
+            def cmHandleId = 'ch-1'
+            def xpath = '/x/y'
+        and: 'the number of subscribers is as follows'
+            def originalNumberOfSubscribers =
+                cmNotificationSubscriptionPersistenceService.getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size()
+        when: 'a subscriber is removed'
+            cmNotificationSubscriptionPersistenceService.removeCmNotificationSubscription(datastoreType,cmHandleId,xpath,'subId-2')
+        then: 'the number of subscribers is reduced by 1'
+            def updatedNumberOfSubscribers =
+                cmNotificationSubscriptionPersistenceService.getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size()
+            assert updatedNumberOfSubscribers == originalNumberOfSubscribers-1
+    }
+
+    def 'Removing the LAST cm notification subscriber for a given cm handle, datastore and xpath'() {
+        given: 'an ongoing cm subscription with the following details'
+            def datastoreType = PASSTHROUGH_RUNNING
+            def cmHandleId = 'ch-1'
+            def xpath = '/x/y'
+        and: 'there is only one subscriber'
+            assert cmNotificationSubscriptionPersistenceService
+                .getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size() == 1
+        when: 'only subscriber is removed'
+            cmNotificationSubscriptionPersistenceService.removeCmNotificationSubscription(datastoreType,cmHandleId,xpath,'subId-1')
+        then: 'there are no longer any subscriptions for the cm handle, datastore and xpath'
+            assert !cmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)
+    }
+
 }
index 3d61bdb..6fd3bca 100644 (file)
@@ -48,7 +48,7 @@ spring:
       minimumIdle: 5
       maximumPoolSize: 80
       idleTimeout: 60000
-      connectionTimeout: 120000
+      connectionTimeout: 30000
       leakDetectionThreshold: 30000
       pool-name: CpsDatabasePool
 
@@ -120,7 +120,7 @@ notification:
       queue-capacity: 500
       wait-for-tasks-to-complete-on-shutdown: true
       thread-name-prefix: Async-
-      time-out-value-in-ms: 2000
+      time-out-value-in-ms: 60000
 
 springdoc:
   swagger-ui:
@@ -165,7 +165,7 @@ logging:
 ncmp:
   dmi:
     httpclient:
-      connectionTimeoutInSeconds: 180
+      connectionTimeoutInSeconds: 30
       maximumConnectionsPerRoute: 50
       maximumConnectionsTotal: 100
       idleConnectionEvictionThresholdInSeconds: 5